HADOOP-18456. NullPointerException in ObjectListingIterator. (#4909)

This problem surfaced in impala integration tests
   IMPALA-11592. TestLocalCatalogRetries.test_fetch_metadata_retry fails in S3 build
after the change
  HADOOP-17461. Add thread-level IOStatistics Context
The actual GC race condition came with
 HADOOP-18091. S3A auditing leaks memory through ThreadLocal references

The fix for this is, if our hypothesis is correct, in WeakReferenceMap.create()
where a strong reference to the new value is kept in a local variable
*and referred to later* so that the JVM will not GC it.

Along with the fix, extra assertions ensure that if the problem is not fixed,
applications will fail faster/more meaningfully.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2022-09-23 09:54:31 +01:00
parent ceec19e61a
commit af0a6d7987
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
5 changed files with 220 additions and 26 deletions

View File

@ -25,6 +25,8 @@ import javax.annotation.Nullable;
import org.apache.hadoop.util.WeakReferenceMap;
import static java.util.Objects.requireNonNull;
/**
* A WeakReferenceMap for threads.
* @param <V> value type of the map
@ -36,30 +38,55 @@ public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {
super(factory, referenceLost);
}
/**
* Get the value for the current thread, creating if needed.
* @return an instance.
*/
public V getForCurrentThread() {
return get(currentThreadId());
}
/**
* Remove the reference for the current thread.
* @return any reference value which existed.
*/
public V removeForCurrentThread() {
return remove(currentThreadId());
}
/**
* Get the current thread ID.
* @return thread ID.
*/
public long currentThreadId() {
return Thread.currentThread().getId();
}
/**
* Set the new value for the current thread.
* @param newVal new reference to set for the active thread.
* @return the previously set value, possibly null
*/
public V setForCurrentThread(V newVal) {
requireNonNull(newVal);
long id = currentThreadId();
// if the same object is already in the map, just return it.
WeakReference<V> ref = lookup(id);
// Reference value could be set to null. Thus, ref.get() could return
// null. Should be handled accordingly while using the returned value.
if (ref != null && ref.get() == newVal) {
return ref.get();
WeakReference<V> existingWeakRef = lookup(id);
// The looked up reference could be one of
// 1. null: nothing there
// 2. valid but get() == null : reference lost by GC.
// 3. different from the new value
// 4. the same as the old value
if (resolve(existingWeakRef) == newVal) {
// case 4: do nothing, return the new value
return newVal;
} else {
// cases 1, 2, 3: update the map and return the old value
return put(id, newVal);
}
return put(id, newVal);
}
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.fs.statistics;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;
import static java.util.Objects.requireNonNull;
/**
* An interface defined to capture thread-level IOStatistics by using per
* thread context.
@ -67,7 +69,11 @@ public interface IOStatisticsContext extends IOStatisticsSource {
* @return instance of IOStatisticsContext for the context.
*/
static IOStatisticsContext getCurrentIOStatisticsContext() {
return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
// the null check is just a safety check to highlight exactly where a null value would
// be returned if HADOOP-18456 has resurfaced.
return requireNonNull(
IOStatisticsContextIntegration.getCurrentIOStatisticsContext(),
"Null IOStatisticsContext");
}
/**

View File

@ -100,7 +100,10 @@ public final class IOStatisticsContextIntegration {
* @return an instance of IOStatisticsContext.
*/
private static IOStatisticsContext createNewInstance(Long key) {
return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
IOStatisticsContextImpl instance =
new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
LOG.debug("Created instance {}", instance);
return instance;
}
/**
@ -131,9 +134,11 @@ public final class IOStatisticsContextIntegration {
IOStatisticsContext statisticsContext) {
if (isThreadIOStatsEnabled) {
if (statisticsContext == null) {
// new value is null, so remove it
ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread();
}
if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
} else {
// the setter is efficient in that it does not create a new
// reference if the context is unchanged.
ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
}
}

View File

@ -28,7 +28,11 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import static java.util.Objects.requireNonNull;
@ -52,6 +56,9 @@ import static java.util.Objects.requireNonNull;
@InterfaceAudience.Private
public class WeakReferenceMap<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(WeakReferenceMap.class);
/**
* The reference map.
*/
@ -79,6 +86,12 @@ public class WeakReferenceMap<K, V> {
*/
private final AtomicLong entriesCreatedCount = new AtomicLong();
/**
* Log to report loss of a reference during the create phase, which
* is believed to be a cause of HADOOP-18456.
*/
private final LogExactlyOnce referenceLostDuringCreation = new LogExactlyOnce(LOG);
/**
* instantiate.
* @param factory supplier of new instances
@ -132,35 +145,93 @@ public class WeakReferenceMap<K, V> {
* @return an instance.
*/
public V get(K key) {
final WeakReference<V> current = lookup(key);
V val = resolve(current);
if (val != null) {
final WeakReference<V> currentWeakRef = lookup(key);
// resolve it, after which if not null, we have a strong reference
V strongVal = resolve(currentWeakRef);
if (strongVal != null) {
// all good.
return val;
return strongVal;
}
// here, either no ref, or the value is null
if (current != null) {
// here, either currentWeakRef was null, or its reference was GC'd.
if (currentWeakRef != null) {
// garbage collection removed the reference.
// explicitly remove the weak ref from the map if it has not
// been updated by this point
// this is here just for completeness.
map.remove(key, currentWeakRef);
// log/report the loss.
noteLost(key);
}
// create a new value and add it to the map
return create(key);
}
/**
* Create a new instance under a key.
* <p>
* The instance is created, added to the map and then the
* map value retrieved.
* This ensures that the reference returned is that in the map,
* even if there is more than one entry being created at the same time.
* If that race does occur, it will be logged the first time it happens
* for this specific map instance.
* <p>
* HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
* value being retrieved and so returned.
* To prevent this:
* <ol>
* <li>A strong reference is retained to the newly created instance
* in a local variable.</li>
* <li>That variable is used after the resolution process, to ensure
* the JVM doesn't consider it "unreachable" and so eligible for GC.</li>
* <li>A check is made for the resolved reference being null, and if so,
* the put() is repeated</li>
* </ol>
* @param key key
* @return the value
* @return the created value
*/
public V create(K key) {
entriesCreatedCount.incrementAndGet();
WeakReference<V> newRef = new WeakReference<>(
requireNonNull(factory.apply(key)));
map.put(key, newRef);
return map.get(key).get();
/*
Get a strong ref so even if a GC happens in this method the reference is not lost.
It is NOT enough to have a reference in a field, it MUST be used
so as to ensure the reference isn't optimized away prematurely.
"A reachable object is any object that can be accessed in any potential continuing
computation from any live thread."
*/
final V strongRef = requireNonNull(factory.apply(key),
"factory returned a null instance");
V resolvedStrongRef;
do {
WeakReference<V> newWeakRef = new WeakReference<>(strongRef);
// put it in the map
map.put(key, newWeakRef);
// get it back from the map
WeakReference<V> retrievedWeakRef = map.get(key);
// resolve that reference, handling the situation where somehow it was removed from the map
// between the put() and the get()
resolvedStrongRef = resolve(retrievedWeakRef);
if (resolvedStrongRef == null) {
referenceLostDuringCreation.warn("reference to %s lost during creation", key);
noteLost(key);
}
} while (resolvedStrongRef == null);
// note if there was any change in the reference.
// as this forces strongRef to be kept in scope
if (strongRef != resolvedStrongRef) {
LOG.debug("Created instance for key {}: {} overwritten by {}",
key, strongRef, resolvedStrongRef);
}
return resolvedStrongRef;
}
/**
@ -203,7 +274,7 @@ public class WeakReferenceMap<K, V> {
* @param r reference to resolve
* @return the value or null
*/
private V resolve(WeakReference<V> r) {
protected V resolve(WeakReference<V> r) {
return r == null ? null : r.get();
}
@ -233,7 +304,7 @@ public class WeakReferenceMap<K, V> {
* @param key key of lost reference
*/
private void noteLost(final K key) {
// incrment local counter
// increment local counter
referenceLostCount.incrementAndGet();
// and call any notification function supplied in the constructor

View File

@ -20,15 +20,19 @@ package org.apache.hadoop.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link WeakReferenceMap}.
* Test {@link WeakReferenceMap} and {@link WeakReferenceThreadMap}.
* There's no attempt to force GC here, so the tests are
* more about the basic behavior not the handling of empty references.
*/
@ -125,11 +129,92 @@ public class TestWeakReferenceMap extends AbstractHadoopTestBase {
}
/**
* It is an error to have a factory which returns null.
*/
@Test
public void testFactoryReturningNull() throws Throwable {
referenceMap = new WeakReferenceMap<>(
(k) -> null,
null);
intercept(NullPointerException.class, () ->
referenceMap.get(0));
}
/**
* Test the WeakReferenceThreadMap extension.
*/
@Test
public void testWeakReferenceThreadMapAssignment()
throws Throwable {
// counters foor the callbacks
final AtomicLong created = new AtomicLong();
final AtomicLong lost = new AtomicLong();
WeakReferenceThreadMap<String> threadMap = new WeakReferenceThreadMap<>(
id -> "Entry for thread ID " + id + " (" + created.incrementAndGet() + ")",
id -> lost.incrementAndGet());
Assertions.assertThat(threadMap.setForCurrentThread("hello"))
.describedAs("current thread map value on first set")
.isNull();
// second attempt returns itself
Assertions.assertThat(threadMap.setForCurrentThread("hello"))
.describedAs("current thread map value on second set")
.isEqualTo("hello");
// it is forbidden to explicitly set to null via the set() call.
intercept(NullPointerException.class, () ->
threadMap.setForCurrentThread(null));
// the map is unchanged
Assertions.assertThat(threadMap.getForCurrentThread())
.describedAs("current thread map value")
.isEqualTo("hello");
// remove the value and assert what the removed entry was
Assertions.assertThat(threadMap.removeForCurrentThread())
.describedAs("removed thread map value")
.isEqualTo("hello");
// remove the value again; this time the removed value is null
Assertions.assertThat(threadMap.removeForCurrentThread())
.describedAs("removed thread map value on second call")
.isNull();
// lookup will return a new instance created by the factory
long c1 = created.get();
String dynamicValue = threadMap.getForCurrentThread();
Assertions.assertThat(dynamicValue)
.describedAs("dynamically created thread map value")
.startsWith("Entry for thread ID")
.contains("(" + (c1 + 1) + ")");
// and we can overwrite that
Assertions.assertThat(threadMap.setForCurrentThread("hello2"))
.describedAs("value before the thread entry is changed")
.isEqualTo(dynamicValue);
// simulate a weak gc
long threadId = threadMap.currentThreadId();
threadMap.put(threadId, null);
String updated = threadMap.getForCurrentThread();
Assertions.assertThat(lost.get())
.describedAs("lost count")
.isEqualTo(1);
Assertions.assertThat(updated)
.describedAs("dynamically created thread map value")
.startsWith("Entry for thread ID")
.contains("(" + (c1 + 2) + ")");
}
/**
* Assert that the value of a map entry is as expected.
* Will trigger entry creation if the key is absent.
* @param key key
* @param val expected valued
* @param val expected value
*/
private void assertMapEntryEquals(int key, String val) {
Assertions.assertThat(referenceMap.get(key))
@ -143,7 +228,7 @@ public class TestWeakReferenceMap extends AbstractHadoopTestBase {
*/
private void assertMapContainsKey(int key) {
Assertions.assertThat(referenceMap.containsKey(key))
.describedAs("map enty of key %d should be present", key)
.describedAs("map entry of key %d should be present", key)
.isTrue();
}