HADOOP-11969. ThreadLocal initialization in several classes is not thread safe (Sean Busbey via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-05-26 12:15:46 -07:00
parent 10732d515f
commit 7dba7005b7
20 changed files with 66 additions and 49 deletions

View File

@ -35,7 +35,8 @@ import org.apache.hadoop.classification.InterfaceStability;
public class MD5Hash implements WritableComparable<MD5Hash> { public class MD5Hash implements WritableComparable<MD5Hash> {
public static final int MD5_LEN = 16; public static final int MD5_LEN = 16;
private static ThreadLocal<MessageDigest> DIGESTER_FACTORY = new ThreadLocal<MessageDigest>() { private static final ThreadLocal<MessageDigest> DIGESTER_FACTORY =
new ThreadLocal<MessageDigest>() {
@Override @Override
protected MessageDigest initialValue() { protected MessageDigest initialValue() {
try { try {

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public class Text extends BinaryComparable public class Text extends BinaryComparable
implements WritableComparable<BinaryComparable> { implements WritableComparable<BinaryComparable> {
private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY =
new ThreadLocal<CharsetEncoder>() { new ThreadLocal<CharsetEncoder>() {
@Override @Override
protected CharsetEncoder initialValue() { protected CharsetEncoder initialValue() {
@ -63,7 +63,7 @@ public class Text extends BinaryComparable
} }
}; };
private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY =
new ThreadLocal<CharsetDecoder>() { new ThreadLocal<CharsetDecoder>() {
@Override @Override
protected CharsetDecoder initialValue() { protected CharsetDecoder initialValue() {

View File

@ -264,7 +264,7 @@ public class ReflectionUtils {
/** /**
* Allocate a buffer for each thread that tries to clone objects. * Allocate a buffer for each thread that tries to clone objects.
*/ */
private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers private static final ThreadLocal<CopyInCopyOutBuffer> CLONE_BUFFERS
= new ThreadLocal<CopyInCopyOutBuffer>() { = new ThreadLocal<CopyInCopyOutBuffer>() {
@Override @Override
protected synchronized CopyInCopyOutBuffer initialValue() { protected synchronized CopyInCopyOutBuffer initialValue() {
@ -289,7 +289,7 @@ public class ReflectionUtils {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T copy(Configuration conf, public static <T> T copy(Configuration conf,
T src, T dst) throws IOException { T src, T dst) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get(); CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
buffer.outBuffer.reset(); buffer.outBuffer.reset();
SerializationFactory factory = getFactory(conf); SerializationFactory factory = getFactory(conf);
Class<T> cls = (Class<T>) src.getClass(); Class<T> cls = (Class<T>) src.getClass();
@ -306,7 +306,7 @@ public class ReflectionUtils {
@Deprecated @Deprecated
public static void cloneWritableInto(Writable dst, public static void cloneWritableInto(Writable dst,
Writable src) throws IOException { Writable src) throws IOException {
CopyInCopyOutBuffer buffer = cloneBuffers.get(); CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
buffer.outBuffer.reset(); buffer.outBuffer.reset();
src.write(buffer.outBuffer); src.write(buffer.outBuffer);
buffer.moveData(); buffer.moveData();

View File

@ -49,7 +49,7 @@ public class KMSMDCFilter implements Filter {
} }
} }
private static ThreadLocal<Data> DATA_TL = new ThreadLocal<Data>(); private static final ThreadLocal<Data> DATA_TL = new ThreadLocal<Data>();
public static UserGroupInformation getUgi() { public static UserGroupInformation getUgi() {
return DATA_TL.get().ugi; return DATA_TL.get().ugi;

View File

@ -46,7 +46,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
private static final String HTTP_PORT = ".http.port"; private static final String HTTP_PORT = ".http.port";
public static final String SSL_ENABLED = ".ssl.enabled"; public static final String SSL_ENABLED = ".ssl.enabled";
private static ThreadLocal<String> HOME_DIR_TL = new ThreadLocal<String>(); private static final ThreadLocal<String> HOME_DIR_TL =
new ThreadLocal<String>();
private InetSocketAddress authority; private InetSocketAddress authority;

View File

@ -89,7 +89,7 @@ public class TestDirHelper implements MethodRule {
} }
} }
private static ThreadLocal<File> TEST_DIR_TL = new InheritableThreadLocal<File>(); private static final ThreadLocal<File> TEST_DIR_TL = new InheritableThreadLocal<File>();
@Override @Override
public Statement apply(final Statement statement, final FrameworkMethod frameworkMethod, final Object o) { public Statement apply(final Statement statement, final FrameworkMethod frameworkMethod, final Object o) {

View File

@ -39,9 +39,9 @@ public class TestHdfsHelper extends TestDirHelper {
public static final String HADOOP_MINI_HDFS = "test.hadoop.hdfs"; public static final String HADOOP_MINI_HDFS = "test.hadoop.hdfs";
private static ThreadLocal<Configuration> HDFS_CONF_TL = new InheritableThreadLocal<Configuration>(); private static final ThreadLocal<Configuration> HDFS_CONF_TL = new InheritableThreadLocal<Configuration>();
private static ThreadLocal<Path> HDFS_TEST_DIR_TL = new InheritableThreadLocal<Path>(); private static final ThreadLocal<Path> HDFS_TEST_DIR_TL = new InheritableThreadLocal<Path>();
@Override @Override
public Statement apply(Statement statement, FrameworkMethod frameworkMethod, Object o) { public Statement apply(Statement statement, FrameworkMethod frameworkMethod, Object o) {

View File

@ -52,7 +52,7 @@ public class TestJettyHelper implements MethodRule {
this.keyStorePassword = keyStorePassword; this.keyStorePassword = keyStorePassword;
} }
private static ThreadLocal<TestJettyHelper> TEST_JETTY_TL = private static final ThreadLocal<TestJettyHelper> TEST_JETTY_TL =
new InheritableThreadLocal<TestJettyHelper>(); new InheritableThreadLocal<TestJettyHelper>();
@Override @Override

View File

@ -290,7 +290,7 @@ class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
// using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
// it has to be a thread local because if not it would break if used from a // it has to be a thread local because if not it would break if used from a
// MultiThreadedMapRunner. // MultiThreadedMapRunner.
private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer = private final ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
new ThreadLocal<DataOutputBuffer>() { new ThreadLocal<DataOutputBuffer>() {
protected DataOutputBuffer initialValue() { protected DataOutputBuffer initialValue() {
return new DataOutputBuffer(1024); return new DataOutputBuffer(1024);

View File

@ -32,7 +32,7 @@ class PipesPartitioner<K extends WritableComparable,
V extends Writable> V extends Writable>
implements Partitioner<K, V> { implements Partitioner<K, V> {
private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> CACHE = new ThreadLocal<Integer>();
private Partitioner<K, V> part = null; private Partitioner<K, V> part = null;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -46,7 +46,7 @@ class PipesPartitioner<K extends WritableComparable,
* @param newValue the next partition value * @param newValue the next partition value
*/ */
static void setNextPartition(int newValue) { static void setNextPartition(int newValue) {
cache.set(newValue); CACHE.set(newValue);
} }
/** /**
@ -58,7 +58,7 @@ class PipesPartitioner<K extends WritableComparable,
*/ */
public int getPartition(K key, V value, public int getPartition(K key, V value,
int numPartitions) { int numPartitions) {
Integer result = cache.get(); Integer result = CACHE.get();
if (result == null) { if (result == null) {
return part.getPartition(key, value, numPartitions); return part.getPartition(key, value, numPartitions);
} else { } else {

View File

@ -54,7 +54,8 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> { public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() { private static final ThreadLocal<Long> SHUFFLE_START =
new ThreadLocal<Long>() {
protected Long initialValue() { protected Long initialValue() {
return 0L; return 0L;
} }
@ -423,7 +424,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() + LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
" to " + Thread.currentThread().getName()); " to " + Thread.currentThread().getName());
shuffleStart.set(Time.monotonicNow()); SHUFFLE_START.set(Time.monotonicNow());
return host; return host;
} }
@ -464,7 +465,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
} }
} }
LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
(Time.monotonicNow()-shuffleStart.get()) + "ms"); (Time.monotonicNow()-SHUFFLE_START.get()) + "ms");
} }
public synchronized void resetKnownMaps() { public synchronized void resetKnownMaps() {

View File

@ -406,7 +406,7 @@ public class DistCpUtils {
/** /**
* String utility to convert a number-of-bytes to human readable format. * String utility to convert a number-of-bytes to human readable format.
*/ */
private static ThreadLocal<DecimalFormat> FORMATTER private static final ThreadLocal<DecimalFormat> FORMATTER
= new ThreadLocal<DecimalFormat>() { = new ThreadLocal<DecimalFormat>() {
@Override @Override
protected DecimalFormat initialValue() { protected DecimalFormat initialValue() {

View File

@ -57,9 +57,10 @@ public class BinaryRecordInput implements RecordInput {
this.in = inp; this.in = inp;
} }
private static ThreadLocal bIn = new ThreadLocal() { private static final ThreadLocal<BinaryRecordInput> B_IN =
new ThreadLocal<BinaryRecordInput>() {
@Override @Override
protected synchronized Object initialValue() { protected BinaryRecordInput initialValue() {
return new BinaryRecordInput(); return new BinaryRecordInput();
} }
}; };
@ -70,7 +71,7 @@ public class BinaryRecordInput implements RecordInput {
* @return binary record input corresponding to the supplied DataInput. * @return binary record input corresponding to the supplied DataInput.
*/ */
public static BinaryRecordInput get(DataInput inp) { public static BinaryRecordInput get(DataInput inp) {
BinaryRecordInput bin = (BinaryRecordInput) bIn.get(); BinaryRecordInput bin = B_IN.get();
bin.setDataInput(inp); bin.setDataInput(inp);
return bin; return bin;
} }

View File

@ -44,20 +44,21 @@ public class BinaryRecordOutput implements RecordOutput {
this.out = out; this.out = out;
} }
private static ThreadLocal bOut = new ThreadLocal() { private static final ThreadLocal<BinaryRecordOutput> B_OUT =
@Override new ThreadLocal<BinaryRecordOutput>() {
protected synchronized Object initialValue() { @Override
return new BinaryRecordOutput(); protected BinaryRecordOutput initialValue() {
} return new BinaryRecordOutput();
}; }
};
/** /**
* Get a thread-local record output for the supplied DataOutput. * Get a thread-local record output for the supplied DataOutput.
* @param out data output stream * @param out data output stream
* @return binary record output corresponding to the supplied DataOutput. * @return binary record output corresponding to the supplied DataOutput.
*/ */
public static BinaryRecordOutput get(DataOutput out) { public static BinaryRecordOutput get(DataOutput out) {
BinaryRecordOutput bout = (BinaryRecordOutput) bOut.get(); BinaryRecordOutput bout = B_OUT.get();
bout.setDataOutput(out); bout.setDataOutput(out);
return bout; return bout;
} }

View File

@ -41,8 +41,10 @@ public class TypedBytesInput {
this.in = in; this.in = in;
} }
private static ThreadLocal tbIn = new ThreadLocal() { private static final ThreadLocal<TypedBytesInput> TB_IN =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesInput>() {
@Override
protected TypedBytesInput initialValue() {
return new TypedBytesInput(); return new TypedBytesInput();
} }
}; };
@ -53,7 +55,7 @@ public class TypedBytesInput {
* @return typed bytes input corresponding to the supplied {@link DataInput}. * @return typed bytes input corresponding to the supplied {@link DataInput}.
*/ */
public static TypedBytesInput get(DataInput in) { public static TypedBytesInput get(DataInput in) {
TypedBytesInput bin = (TypedBytesInput) tbIn.get(); TypedBytesInput bin = TB_IN.get();
bin.setDataInput(in); bin.setDataInput(in);
return bin; return bin;
} }

View File

@ -42,8 +42,10 @@ public class TypedBytesOutput {
this.out = out; this.out = out;
} }
private static ThreadLocal tbOut = new ThreadLocal() { private static final ThreadLocal<TypedBytesOutput> TB_OUT =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesOutput>() {
@Override
protected TypedBytesOutput initialValue() {
return new TypedBytesOutput(); return new TypedBytesOutput();
} }
}; };
@ -56,7 +58,7 @@ public class TypedBytesOutput {
* {@link DataOutput}. * {@link DataOutput}.
*/ */
public static TypedBytesOutput get(DataOutput out) { public static TypedBytesOutput get(DataOutput out) {
TypedBytesOutput bout = (TypedBytesOutput) tbOut.get(); TypedBytesOutput bout = TB_OUT.get();
bout.setDataOutput(out); bout.setDataOutput(out);
return bout; return bout;
} }

View File

@ -38,8 +38,10 @@ public class TypedBytesRecordInput implements RecordInput {
this.in = in; this.in = in;
} }
private static ThreadLocal tbIn = new ThreadLocal() { private static final ThreadLocal<TypedBytesRecordInput> TB_IN =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesRecordInput>() {
@Override
protected TypedBytesRecordInput initialValue() {
return new TypedBytesRecordInput(); return new TypedBytesRecordInput();
} }
}; };
@ -53,7 +55,7 @@ public class TypedBytesRecordInput implements RecordInput {
* {@link TypedBytesInput}. * {@link TypedBytesInput}.
*/ */
public static TypedBytesRecordInput get(TypedBytesInput in) { public static TypedBytesRecordInput get(TypedBytesInput in) {
TypedBytesRecordInput bin = (TypedBytesRecordInput) tbIn.get(); TypedBytesRecordInput bin = TB_IN.get();
bin.setTypedBytesInput(in); bin.setTypedBytesInput(in);
return bin; return bin;
} }

View File

@ -40,8 +40,10 @@ public class TypedBytesRecordOutput implements RecordOutput {
this.out = out; this.out = out;
} }
private static ThreadLocal tbOut = new ThreadLocal() { private static final ThreadLocal<TypedBytesRecordOutput> TB_OUT =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesRecordOutput>() {
@Override
protected TypedBytesRecordOutput initialValue() {
return new TypedBytesRecordOutput(); return new TypedBytesRecordOutput();
} }
}; };
@ -55,7 +57,7 @@ public class TypedBytesRecordOutput implements RecordOutput {
* {@link TypedBytesOutput}. * {@link TypedBytesOutput}.
*/ */
public static TypedBytesRecordOutput get(TypedBytesOutput out) { public static TypedBytesRecordOutput get(TypedBytesOutput out) {
TypedBytesRecordOutput bout = (TypedBytesRecordOutput) tbOut.get(); TypedBytesRecordOutput bout = TB_OUT.get();
bout.setTypedBytesOutput(out); bout.setTypedBytesOutput(out);
return bout; return bout;
} }

View File

@ -61,8 +61,10 @@ public class TypedBytesWritableInput implements Configurable {
this.in = in; this.in = in;
} }
private static ThreadLocal tbIn = new ThreadLocal() { private static final ThreadLocal<TypedBytesWritableInput> TB_IN =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesWritableInput>() {
@Override
protected TypedBytesWritableInput initialValue() {
return new TypedBytesWritableInput(); return new TypedBytesWritableInput();
} }
}; };
@ -76,7 +78,7 @@ public class TypedBytesWritableInput implements Configurable {
* {@link TypedBytesInput}. * {@link TypedBytesInput}.
*/ */
public static TypedBytesWritableInput get(TypedBytesInput in) { public static TypedBytesWritableInput get(TypedBytesInput in) {
TypedBytesWritableInput bin = (TypedBytesWritableInput) tbIn.get(); TypedBytesWritableInput bin = TB_IN.get();
bin.setTypedBytesInput(in); bin.setTypedBytesInput(in);
return bin; return bin;
} }

View File

@ -58,8 +58,10 @@ public class TypedBytesWritableOutput {
this.out = out; this.out = out;
} }
private static ThreadLocal tbOut = new ThreadLocal() { private static final ThreadLocal<TypedBytesWritableOutput> TB_OUT =
protected synchronized Object initialValue() { new ThreadLocal<TypedBytesWritableOutput>() {
@Override
protected TypedBytesWritableOutput initialValue() {
return new TypedBytesWritableOutput(); return new TypedBytesWritableOutput();
} }
}; };
@ -73,7 +75,7 @@ public class TypedBytesWritableOutput {
* {@link TypedBytesOutput}. * {@link TypedBytesOutput}.
*/ */
public static TypedBytesWritableOutput get(TypedBytesOutput out) { public static TypedBytesWritableOutput get(TypedBytesOutput out) {
TypedBytesWritableOutput bout = (TypedBytesWritableOutput) tbOut.get(); TypedBytesWritableOutput bout = TB_OUT.get();
bout.setTypedBytesOutput(out); bout.setTypedBytesOutput(out);
return bout; return bout;
} }