MAPREDUCE-6715. Fix Several Unsafe Practices (Contributed by Yufei Gu via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2017-01-05 17:55:05 -08:00
parent 5d182949ba
commit 0b8a7c18dd
6 changed files with 56 additions and 35 deletions

View File

@ -136,7 +136,12 @@ class CleanupQueue {
LOG.debug("DELETED " + context.fullPath);
}
} catch (InterruptedException t) {
LOG.warn("Interrupted deletion of " + context.fullPath);
if (context == null) {
LOG.warn("Interrupted deletion of an invalid path: Path deletion "
+ "context is null.");
} else {
LOG.warn("Interrupted deletion of " + context.fullPath);
}
return;
} catch (Exception e) {
LOG.warn("Error deleting path " + context.fullPath + ": " + e);

View File

@ -411,8 +411,14 @@ public class MapTask extends Task {
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
if (lastException != null) {
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was:" + lastException.toString(),
lastException);
} else {
throw new IOException("Initialization of all the collectors failed.");
}
}
@SuppressWarnings("unchecked")

View File

@ -113,19 +113,18 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
codec = ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
FSDataOutputStream fileOut = fs.create(file, false);
if (isCompressed) {
return new LineRecordWriter<>(
new DataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
return new LineRecordWriter<>(fileOut, keyValueSeparator);
}
}
}

View File

@ -433,25 +433,29 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
public synchronized MapHost getHost() throws InterruptedException {
while(pendingHosts.isEmpty()) {
wait();
}
while(pendingHosts.isEmpty()) {
wait();
}
MapHost host = null;
Iterator<MapHost> iter = pendingHosts.iterator();
int numToPick = random.nextInt(pendingHosts.size());
for (int i=0; i <= numToPick; ++i) {
host = iter.next();
}
Iterator<MapHost> iter = pendingHosts.iterator();
// Safe to take one because we know pendingHosts isn't empty
MapHost host = iter.next();
int numToPick = random.nextInt(pendingHosts.size());
for (int i = 0; i < numToPick; ++i) {
host = iter.next();
}
pendingHosts.remove(host);
host.markBusy();
pendingHosts.remove(host);
host.markBusy();
LOG.debug("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
" to " + Thread.currentThread().getName());
SHUFFLE_START.set(Time.monotonicNow());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Assigning " + host + " with " + host.getNumKnownMapOutputs() + " to "
+ Thread.currentThread().getName());
}
SHUFFLE_START.set(Time.monotonicNow());
return host;
return host;
}
public synchronized List<TaskAttemptID> getMapsForHost(MapHost host) {
@ -477,8 +481,10 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
host.addKnownMap(id);
}
}
LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " +
host + " to " + Thread.currentThread().getName());
if (LOG.isDebugEnabled()) {
LOG.debug("assigned " + includedMaps + " of " + totalSize + " to " + host
+ " to " + Thread.currentThread().getName());
}
return result;
}

View File

@ -153,6 +153,11 @@ public class Pentomino {
break;
}
}
if (piece == null) {
continue;
}
// for each point where the piece was placed, mark it with the piece name
for(ColumnName item: row) {
if (item instanceof Point) {

View File

@ -73,14 +73,14 @@ class TeraScheduler {
List<String> readFile(String filename) throws IOException {
List<String> result = new ArrayList<String>(10000);
BufferedReader in = new BufferedReader(
new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8));
String line = in.readLine();
while (line != null) {
result.add(line);
line = in.readLine();
try (BufferedReader in = new BufferedReader(
new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8))) {
String line = in.readLine();
while (line != null) {
result.add(line);
line = in.readLine();
}
}
in.close();
return result;
}