MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.

Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
(cherry picked from commit 062c50db6b)
This commit is contained in:
Ashutosh Gupta 2022-10-06 23:23:05 +01:00 committed by Chris Nauroth
parent 1c3bf42ad0
commit 725cd90712
5 changed files with 165 additions and 9 deletions

View File

@ -17,15 +17,39 @@
*/
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.*;
/**
* The MultipleOutputs class simplifies writing to additional outputs other
* than the job default output via the <code>OutputCollector</code> passed to
@ -132,6 +156,7 @@ public class MultipleOutputs {
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(MultipleOutputs.class);
/**
* Checks if a named output is alreadyDefined or not.
@ -381,6 +406,11 @@ public class MultipleOutputs {
private Map<String, RecordWriter> recordWriters;
private boolean countersEnabled;
@VisibleForTesting
synchronized void setRecordWriters(Map<String, RecordWriter> recordWriters) {
this.recordWriters = recordWriters;
}
/**
* Creates and initializes multiple named outputs support, it should be
* instantiated in the Mapper/Reducer configure method.
@ -528,8 +558,41 @@ public class MultipleOutputs {
* could not be closed properly.
*/
public void close() throws IOException {
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
AtomicBoolean encounteredException = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
.setUncaughtExceptionHandler(((t, e) -> {
LOG.error("Thread " + t + " failed unexpectedly", e);
encounteredException.set(true);
})).build();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
for (RecordWriter writer : recordWriters.values()) {
writer.close(null);
callableList.add(() -> {
try {
writer.close(null);
} catch (IOException e) {
LOG.error("Error while closing MultipleOutput file", e);
encounteredException.set(true);
}
return null;
});
}
try {
executorService.invokeAll(callableList);
} catch (InterruptedException e) {
LOG.warn("Closing is Interrupted");
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}
if (encounteredException.get()) {
throw new IOException(
"One or more threads encountered exception during close. See prior errors.");
}
}

View File

@ -131,5 +131,7 @@ public interface MRConfig {
String MASTER_WEBAPP_UI_ACTIONS_ENABLED =
"mapreduce.webapp.ui-actions.enabled";
boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true;
String MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = "mapreduce.multiple-outputs-close-threads";
int DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = 10;
}

View File

@ -19,16 +19,24 @@ package org.apache.hadoop.mapreduce.lib.output;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The MultipleOutputs class simplifies writing output data
@ -193,6 +201,8 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
private static final Logger LOG =
LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);
/**
* Cache for the taskContexts
@ -347,6 +357,11 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
}
@VisibleForTesting
synchronized void setRecordWriters(Map<String, RecordWriter<?, ?>> recordWriters) {
this.recordWriters = recordWriters;
}
/**
* Wraps RecordWriter to increment counters.
*/
@ -570,8 +585,43 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
*/
@SuppressWarnings("unchecked")
public void close() throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
AtomicBoolean encounteredException = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
.setUncaughtExceptionHandler(((t, e) -> {
LOG.error("Thread " + t + " failed unexpectedly", e);
encounteredException.set(true);
})).build();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
for (RecordWriter writer : recordWriters.values()) {
writer.close(context);
callableList.add(() -> {
try {
writer.close(context);
} catch (IOException e) {
LOG.error("Error while closing MultipleOutput file", e);
encounteredException.set(true);
}
return null;
});
}
try {
executorService.invokeAll(callableList);
} catch (InterruptedException e) {
LOG.warn("Closing is Interrupted");
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}
if (encounteredException.get()) {
throw new IOException(
"One or more threads encountered exception during close. See prior errors.");
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
@ -46,11 +47,16 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMultipleOutputs extends HadoopTestCase {
@ -70,6 +76,19 @@ public class TestMultipleOutputs extends HadoopTestCase {
_testMOWithJavaSerialization(true);
}
@SuppressWarnings("unchecked")
@Test(expected = IOException.class)
public void testParallelCloseIOException() throws IOException {
RecordWriter writer = mock(RecordWriter.class);
Map<String, RecordWriter> recordWriters = mock(Map.class);
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
doThrow(new IOException("test IO exception")).when(writer).close(null);
JobConf conf = createJobConf();
MultipleOutputs mos = new MultipleOutputs(conf);
mos.setRecordWriters(recordWriters);
mos.close();
}
private static final Path ROOT_DIR = new Path("testing/mo");
private static final Path IN_DIR = new Path(ROOT_DIR, "input");
private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
@ -307,6 +326,7 @@ public class TestMultipleOutputs extends HadoopTestCase {
}
@SuppressWarnings({"unchecked"})
public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
Text> {

View File

@ -31,7 +31,9 @@ import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -39,10 +41,15 @@ import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMRMultipleOutputs extends HadoopTestCase {
@ -62,6 +69,20 @@ public class TestMRMultipleOutputs extends HadoopTestCase {
_testMOWithJavaSerialization(true);
}
@SuppressWarnings("unchecked")
@Test(expected = IOException.class)
public void testParallelCloseIOException() throws IOException, InterruptedException {
RecordWriter writer = mock(RecordWriter.class);
Map recordWriters = mock(Map.class);
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
Mapper.Context taskInputOutputContext = mock(Mapper.Context.class);
when(taskInputOutputContext.getConfiguration()).thenReturn(createJobConf());
doThrow(new IOException("test IO exception")).when(writer).close(taskInputOutputContext);
MultipleOutputs<Long, String> mos = new MultipleOutputs<Long, String>(taskInputOutputContext);
mos.setRecordWriters(recordWriters);
mos.close();
}
private static String localPathRoot =
System.getProperty("test.build.data", "/tmp");
private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");