MAPREDUCE-5053. java.lang.InternalError from decompression codec cause reducer to fail (Robert Parker via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1458351 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2013-03-19 15:48:36 +00:00
parent 9f7823656b
commit 3fdeec0bd3
3 changed files with 77 additions and 8 deletions

View File

@ -650,6 +650,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
(Jason Lowe via bobby)
MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
reducer to fail (Robert Parker via jeagles)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -357,13 +357,20 @@ class Fetcher<K,V> extends Thread {
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map " +
mapOutput.getMapId() + " decomp: " +
decompressedLength + " len: " + compressedLength + " to " +
mapOutput.getDescription());
mapOutput.shuffle(host, input, compressedLength, decompressedLength,
metrics, reporter);
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, input, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();

View File

@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
@ -233,4 +234,62 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testCopyFromHostCompressFailure() throws Exception {
LOG.info("testCopyFromHostCompressFailure");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
ExceptionReporter except = mock(ExceptionReporter.class);
SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
HttpURLConnection connection = mock(HttpURLConnection.class);
Counters.Counter allErrs = mock(Counters.Counter.class);
when(r.getCounter(anyString(), anyString()))
.thenReturn(allErrs);
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
MapHost host = new MapHost("localhost", "http://localhost:8080/");
ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
maps.add(map1ID);
TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
maps.add(map2ID);
when(ss.getMapsForHost(host)).thenReturn(maps);
String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo);
doThrow(new java.lang.InternalError())
.when(immo)
.shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
underTest.copyFromHost(host);
verify(connection)
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(ss, times(1)).copyFailed(map1ID, host, true, false);
}
}