MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of indexes for better cache performance. (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1514610 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1f0d7631d5
commit
0c32694b27
|
@ -52,6 +52,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
|
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
|
||||||
conditions (jlowe via kihwal)
|
conditions (jlowe via kihwal)
|
||||||
|
|
||||||
|
MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of
|
||||||
|
indexes for better cache performance. (Sandy Ryza)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
|
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
|
||||||
|
|
|
@ -884,10 +884,10 @@ public class MapTask extends Task {
|
||||||
byte[] kvbuffer; // main output buffer
|
byte[] kvbuffer; // main output buffer
|
||||||
private final byte[] b0 = new byte[0];
|
private final byte[] b0 = new byte[0];
|
||||||
|
|
||||||
private static final int INDEX = 0; // index offset in acct
|
private static final int VALSTART = 0; // val offset in acct
|
||||||
private static final int VALSTART = 1; // val offset in acct
|
private static final int KEYSTART = 1; // key offset in acct
|
||||||
private static final int KEYSTART = 2; // key offset in acct
|
private static final int PARTITION = 2; // partition offset in acct
|
||||||
private static final int PARTITION = 3; // partition offset in acct
|
private static final int VALLEN = 3; // length of value
|
||||||
private static final int NMETA = 4; // num meta ints
|
private static final int NMETA = 4; // num meta ints
|
||||||
private static final int METASIZE = NMETA * 4; // size in bytes
|
private static final int METASIZE = NMETA * 4; // size in bytes
|
||||||
|
|
||||||
|
@ -1151,10 +1151,10 @@ public class MapTask extends Task {
|
||||||
distanceTo(keystart, valend, bufvoid));
|
distanceTo(keystart, valend, bufvoid));
|
||||||
|
|
||||||
// write accounting info
|
// write accounting info
|
||||||
kvmeta.put(kvindex + INDEX, kvindex);
|
|
||||||
kvmeta.put(kvindex + PARTITION, partition);
|
kvmeta.put(kvindex + PARTITION, partition);
|
||||||
kvmeta.put(kvindex + KEYSTART, keystart);
|
kvmeta.put(kvindex + KEYSTART, keystart);
|
||||||
kvmeta.put(kvindex + VALSTART, valstart);
|
kvmeta.put(kvindex + VALSTART, valstart);
|
||||||
|
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
|
||||||
// advance kvindex
|
// advance kvindex
|
||||||
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
|
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
|
||||||
} catch (MapBufferTooSmallException e) {
|
} catch (MapBufferTooSmallException e) {
|
||||||
|
@ -1224,17 +1224,11 @@ public class MapTask extends Task {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For the given meta position, return the dereferenced position in the
|
* For the given meta position, return the offset into the int-sized
|
||||||
* integer array. Each meta block contains several integers describing
|
* kvmeta buffer.
|
||||||
* record data in its serialized form, but the INDEX is not necessarily
|
|
||||||
* related to the proximate metadata. The index value at the referenced int
|
|
||||||
* position is the start offset of the associated metadata block. So the
|
|
||||||
* metadata INDEX at metapos may point to the metadata described by the
|
|
||||||
* metadata block at metapos + k, which contains information about that
|
|
||||||
* serialized record.
|
|
||||||
*/
|
*/
|
||||||
int offsetFor(int metapos) {
|
int offsetFor(int metapos) {
|
||||||
return kvmeta.get(metapos * NMETA + INDEX);
|
return metapos * NMETA;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1260,16 +1254,17 @@ public class MapTask extends Task {
|
||||||
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
|
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final byte META_BUFFER_TMP[] = new byte[METASIZE];
|
||||||
/**
|
/**
|
||||||
* Swap logical indices st i, j MOD offset capacity.
|
* Swap metadata for items i, j
|
||||||
* @see IndexedSortable#swap
|
* @see IndexedSortable#swap
|
||||||
*/
|
*/
|
||||||
public void swap(final int mi, final int mj) {
|
public void swap(final int mi, final int mj) {
|
||||||
final int kvi = (mi % maxRec) * NMETA + INDEX;
|
int iOff = (mi % maxRec) * METASIZE;
|
||||||
final int kvj = (mj % maxRec) * NMETA + INDEX;
|
int jOff = (mj % maxRec) * METASIZE;
|
||||||
int tmp = kvmeta.get(kvi);
|
System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
|
||||||
kvmeta.put(kvi, kvmeta.get(kvj));
|
System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
|
||||||
kvmeta.put(kvj, tmp);
|
System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1601,9 +1596,9 @@ public class MapTask extends Task {
|
||||||
while (spindex < mend &&
|
while (spindex < mend &&
|
||||||
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
|
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
|
||||||
final int kvoff = offsetFor(spindex % maxRec);
|
final int kvoff = offsetFor(spindex % maxRec);
|
||||||
key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
|
int keystart = kvmeta.get(kvoff + KEYSTART);
|
||||||
(kvmeta.get(kvoff + VALSTART) -
|
int valstart = kvmeta.get(kvoff + VALSTART);
|
||||||
kvmeta.get(kvoff + KEYSTART)));
|
key.reset(kvbuffer, keystart, valstart - keystart);
|
||||||
getVBytesForOffset(kvoff, value);
|
getVBytesForOffset(kvoff, value);
|
||||||
writer.append(key, value);
|
writer.append(key, value);
|
||||||
++spindex;
|
++spindex;
|
||||||
|
@ -1729,14 +1724,8 @@ public class MapTask extends Task {
|
||||||
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
|
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
|
||||||
// get the keystart for the next serialized value to be the end
|
// get the keystart for the next serialized value to be the end
|
||||||
// of this value. If this is the last value in the buffer, use bufend
|
// of this value. If this is the last value in the buffer, use bufend
|
||||||
final int nextindex = kvoff == kvend
|
final int vallen = kvmeta.get(kvoff + VALLEN);
|
||||||
? bufend
|
assert vallen >= 0;
|
||||||
: kvmeta.get(
|
|
||||||
(kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
|
|
||||||
// calculate the length of the value
|
|
||||||
int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
|
|
||||||
? nextindex - kvmeta.get(kvoff + VALSTART)
|
|
||||||
: (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
|
|
||||||
vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
|
vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue