MAPREDUCE-7390. Remove WhiteBox in mapreduce module. (#4462)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
slfan1989 2022-11-14 09:45:20 +08:00 committed by GitHub
parent d3c1c453f0
commit 04b31d7ecf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 13 deletions

View File

@ -879,4 +879,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
return super.compareTo(obj); return super.compareTo(obj);
} }
} }
@VisibleForTesting
OnDiskMerger getOnDiskMerger() {
return onDiskMerger;
}
} }

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -109,4 +110,14 @@ abstract class MergeThread<T,K,V> extends Thread {
} }
public abstract void merge(List<T> inputs) throws IOException; public abstract void merge(List<T> inputs) throws IOException;
@VisibleForTesting
int getMergeFactor() {
return mergeFactor;
}
@VisibleForTesting
LinkedList<List<T>> getPendingToBeMerged() {
return pendingToBeMerged;
}
} }

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.apache.hadoop.test.Whitebox;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -217,8 +215,7 @@ public class TestMergeManager {
@SuppressWarnings({ "unchecked", "deprecation" }) @SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000) @Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException, public void testOnDiskMerger() throws IOException {
InterruptedException {
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
final int SORT_FACTOR = 5; final int SORT_FACTOR = 5;
jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR); jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
@ -229,12 +226,8 @@ public class TestMergeManager {
new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
, null, null, null, null, null, null, null, null, null, mapOutputFile); , null, null, null, null, null, null, null, null, null, mapOutputFile);
MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable> MergeThread onDiskMerger = manager.getOnDiskMerger();
onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>, int mergeFactor = onDiskMerger.getMergeFactor();
IntWritable, IntWritable>) Whitebox.getInternalState(manager,
"onDiskMerger");
int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
"mergeFactor");
// make sure the io.sort.factor is set properly // make sure the io.sort.factor is set properly
assertEquals(mergeFactor, SORT_FACTOR); assertEquals(mergeFactor, SORT_FACTOR);
@ -252,9 +245,7 @@ public class TestMergeManager {
} }
//Check that the files pending to be merged are in sorted order. //Check that the files pending to be merged are in sorted order.
LinkedList<List<CompressAwarePath>> pendingToBeMerged = LinkedList<List<CompressAwarePath>> pendingToBeMerged = onDiskMerger.getPendingToBeMerged();
(LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
onDiskMerger, "pendingToBeMerged");
assertTrue("No inputs were added to list pending to merge", assertTrue("No inputs were added to list pending to merge",
pendingToBeMerged.size() > 0); pendingToBeMerged.size() > 0);
for(int i = 0; i < pendingToBeMerged.size(); ++i) { for(int i = 0; i < pendingToBeMerged.size(); ++i) {