MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup

This commit is contained in:
Todd Lipcon 2014-09-03 12:02:47 -07:00
parent cce7d1e2f9
commit 1081d9cee2
59 changed files with 264 additions and 148 deletions

View File

@ -19,3 +19,4 @@ MAPREDUCE-5977. Fix or suppress native-task gcc warnings (Manu Zhang via todd)
MAPREDUCE-6054. native-task: Speed up tests (todd)
MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is sucessful (Binglin Chang)
MAPREDUCE-6056. native-task: move system test working dir to target dir and cleanup test config xml files (Manu Zhang via bchang)
MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup (todd)

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapred.nativetask;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class Command {
private int id;
@ -46,4 +49,9 @@ public class Command {
}
return false;
}
@Override
public int hashCode() {
return id;
}
}

View File

@ -19,20 +19,15 @@
package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
/**
* a CommandDispatcher receives {@link Command} from upstream
* and performs corresponding operations
*/
@InterfaceAudience.Private
public interface CommandDispatcher {
/**
*
* @param command
* @param parameter
* @return
* @throws IOException
*/
public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException;
}

View File

@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class Constants {
public static final String MAP_SORT_CLASS = "map.sort.class";
@ -40,17 +42,17 @@ public class Constants {
public static final String NATIVE_OUTPUT_FILE_NAME = "native.output.file.name";
public static final String NATIVE_PROCESSOR_BUFFER_KB = "native.processor.buffer.kb";
public static int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64;
public static int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024;
public static final int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64;
public static final int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024;
public static final String NATIVE_STATUS_UPDATE_INTERVAL = "native.update.interval";
public static int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000;
public static final int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000;
public static final String SERIALIZATION_FRAMEWORK = "SerializationFramework";
public static int SIZEOF_PARTITION_LENGTH = 4;
public static int SIZEOF_KEY_LENGTH = 4;
public static int SIZEOF_VALUE_LENGTH = 4;
public static int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH;
public static final int SIZEOF_PARTITION_LENGTH = 4;
public static final int SIZEOF_KEY_LENGTH = 4;
public static final int SIZEOF_VALUE_LENGTH = 4;
public static final int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH;
public static final String NATIVE_CLASS_LIBRARY = "native.class.library";
public static final String NATIVE_CLASS_LIBRARY_CUSTOM = "native.class.library.custom";

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapred.nativetask;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public enum DataChannel {
/**
* We will only read data from this channel

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* a DataReceiver pulls in arriving data, an example
* is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller}
*/
@InterfaceAudience.Private
public interface DataReceiver {
/**

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
@ -36,6 +37,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.nativetask.serde.*;
@InterfaceAudience.Private
public class HadoopPlatform extends Platform {
private static final Log LOG = LogFactory.getLog(HadoopPlatform.class);
@ -61,7 +63,7 @@ public class HadoopPlatform extends Platform {
}
@Override
public boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
public boolean support(String keyClassName, INativeSerializer<?> serializer, JobConf job) {
if (keyClassNames.contains(keyClassName)
&& serializer instanceof INativeComparable) {
return true;
@ -71,7 +73,7 @@ public class HadoopPlatform extends Platform {
}
@Override
public boolean define(Class comparatorClass) {
public boolean define(Class<?> comparatorClass) {
return false;
}

View File

@ -19,9 +19,12 @@ package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* interacts with native side to support Java Combiner
*/
@InterfaceAudience.Private
public interface ICombineHandler {
/**

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapred.nativetask;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
*
* Any key type that is comparable at native side must implement this interface
@ -45,6 +48,8 @@ package org.apache.hadoop.mapred.nativetask;
* return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
* }
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface INativeComparable {
}

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
/**
* A Handler accept input, and give output can be used to transfer command and data
*/
@InterfaceAudience.Private
public interface INativeHandler extends NativeDataTarget, NativeDataSource {
public String name();

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
@ -30,11 +31,11 @@ import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
import org.apache.hadoop.util.DirectBufferPool;
/**
* used to create channel, transfer data and command between Java and native
*/
@InterfaceAudience.Private
public class NativeBatchProcessor implements INativeHandler {
private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class);

View File

@ -20,33 +20,27 @@ package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
/**
* NativeDataSource loads data from upstream
*/
@InterfaceAudience.Private
public interface NativeDataSource {
/**
* get input buffer
*
* @return
*/
public InputBuffer getInputBuffer();
/**
* set listener. When data from upstream arrives, the listener will be activated.
*
* @param handler
*/
void setDataReceiver(DataReceiver handler);
/**
* load data from upstream
*
* @return
* @throws IOException
*/
public void loadData() throws IOException;

View File

@ -20,31 +20,27 @@ package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
/**
* NativeDataTarge sends data to downstream
*/
@InterfaceAudience.Private
public interface NativeDataTarget {
/**
* send a signal to indicate that the data has been stored in output buffer
*
* @throws IOException
* Sends a signal to indicate that the data has been stored in output buffer
*/
public void sendData() throws IOException;
/**
* Send a signal that there is no more data
*
* @throws IOException
* Sends a signal that there is no more data
*/
public void finishSendData() throws IOException;
/**
* get the output buffer.
*
* @return
* Gets the output buffer.
*/
public OutputBuffer getOutputBuffer();

View File

@ -17,13 +17,13 @@
*/
package org.apache.hadoop.mapred.nativetask;
import java.io.File;
import java.io.IOException;
import com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
@ -34,13 +34,12 @@ import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.RunJar;
/**
* native map output collector wrapped in Java interface
*/
@InterfaceAudience.Private
public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> {
private static Log LOG = LogFactory.getLog(NativeMapOutputCollectorDelegator.class);
@ -67,6 +66,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect
handler.flush();
}
@SuppressWarnings("unchecked")
@Override
public void init(Context context) throws IOException, ClassNotFoundException {
this.job = context.getJobConf();
@ -79,7 +79,8 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect
throw new InvalidJobConfException(message);
}
Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class);
Class<?> comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null,
RawComparator.class);
if (comparatorClass != null && !Platforms.define(comparatorClass)) {
String message = "Native output collector doesn't support customized java comparator "
+ job.get(MRJobConfig.KEY_COMPARATOR);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.FloatWritable;
@ -34,11 +35,12 @@ import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
import org.apache.hadoop.util.VersionInfo;
/**
* This class stands for the native runtime It has three functions: 1. Create native handlers for map, reduce,
* outputcollector, and etc 2. Configure native task with provided MR configs 3. Provide file system api to native
* space, so that it can use File system like HDFS.
*
* This class stands for the native runtime It has three functions:
* 1. Create native handlers for map, reduce, outputcollector, etc
* 2. Configure native task with provided MR configs
* 3. Provide file system api to native space, so that it can use File system like HDFS.
*/
@InterfaceAudience.Private
public class NativeRuntime {
private static Log LOG = LogFactory.getLog(NativeRuntime.class);
private static boolean nativeLibraryLoaded = false;

View File

@ -18,13 +18,11 @@
package org.apache.hadoop.mapred.nativetask;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
@ -39,6 +37,8 @@ import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
* that supports all key types of Hadoop and users could implement their custom
* platform.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Platform {
private final NativeSerialization serialization;
protected Set<String> keyClassNames = new HashSet<String>();
@ -67,7 +67,7 @@ public abstract class Platform {
* @param key key serializer class
* @throws IOException
*/
protected void registerKey(String keyClassName, Class key) throws IOException {
protected void registerKey(String keyClassName, Class<?> key) throws IOException {
serialization.register(keyClassName, key);
keyClassNames.add(keyClassName);
}
@ -85,7 +85,8 @@ public abstract class Platform {
* @return true if the platform has implemented native comparators of the key and
* false otherwise
*/
protected abstract boolean support(String keyClassName, INativeSerializer serializer, JobConf job);
protected abstract boolean support(String keyClassName,
INativeSerializer<?> serializer, JobConf job);
/**
@ -98,5 +99,5 @@ public abstract class Platform {
* @param keyComparator comparator set with mapreduce.job.output.key.comparator.class
* @return
*/
protected abstract boolean define(Class keyComparator);
protected abstract boolean define(Class<?> keyComparator);
}

View File

@ -22,6 +22,7 @@ import java.util.ServiceLoader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
@ -33,6 +34,7 @@ import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
* it is also the facade to check for key type support and other
* platform methods
*/
@InterfaceAudience.Private
public class Platforms {
private static final Log LOG = LogFactory.getLog(Platforms.class);
@ -48,7 +50,8 @@ public class Platforms {
}
}
public static boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
public static boolean support(String keyClassName,
INativeSerializer<?> serializer, JobConf job) {
synchronized (platforms) {
for (Platform platform : platforms) {
if (platform.support(keyClassName, serializer, job)) {
@ -61,7 +64,7 @@ public class Platforms {
return false;
}
public static boolean define(Class keyComparator) {
public static boolean define(Class<?> keyComparator) {
synchronized (platforms) {
for (Platform platform : platforms) {
if (platform.define(keyComparator)) {

View File

@ -21,17 +21,18 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Task.Counter;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
/**
* Will periodically check status from native and report to MR framework.
*
*/
public class StatusReportChecker implements Runnable {
class StatusReportChecker implements Runnable {
private static Log LOG = LogFactory.getLog(StatusReportChecker.class);
public static int INTERVAL = 1000; // milli-seconds
public static final int INTERVAL = 1000; // milliseconds
private Thread checker;
private final TaskReporter reporter;
@ -68,19 +69,19 @@ public class StatusReportChecker implements Runnable {
}
protected void initUsedCounters() {
reporter.getCounter(Counter.MAP_INPUT_RECORDS);
reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
reporter.getCounter(Counter.MAP_INPUT_BYTES);
reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
reporter.getCounter(Counter.MAP_OUTPUT_MATERIALIZED_BYTES);
reporter.getCounter(Counter.COMBINE_INPUT_RECORDS);
reporter.getCounter(Counter.COMBINE_OUTPUT_RECORDS);
reporter.getCounter(Counter.REDUCE_INPUT_RECORDS);
reporter.getCounter(Counter.REDUCE_OUTPUT_RECORDS);
reporter.getCounter(Counter.REDUCE_INPUT_GROUPS);
reporter.getCounter(Counter.SPILLED_RECORDS);
reporter.getCounter(Counter.MAP_OUTPUT_BYTES);
reporter.getCounter(Counter.MAP_OUTPUT_RECORDS);
reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
reporter.getCounter(FileInputFormatCounter.BYTES_READ);
reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
reporter.getCounter(TaskCounter.SPILLED_RECORDS);
reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
}
public synchronized void start() {

View File

@ -17,20 +17,23 @@
*/
package org.apache.hadoop.mapred.nativetask;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapred.TaskAttemptID;
@InterfaceAudience.Private
public class TaskContext {
private final JobConf conf;
private Class iKClass;
private Class iVClass;
private Class oKClass;
private Class oVClass;
private Class<?> iKClass;
private Class<?> iVClass;
private Class<?> oKClass;
private Class<?> oVClass;
private final TaskReporter reporter;
private final TaskAttemptID taskAttemptID;
public TaskContext(JobConf conf, Class iKClass, Class iVClass, Class oKClass, Class oVClass, TaskReporter reporter,
public TaskContext(JobConf conf, Class<?> iKClass, Class<?> iVClass,
Class<?> oKClass, Class<?> oVClass, TaskReporter reporter,
TaskAttemptID id) {
this.conf = conf;
this.iKClass = iKClass;
@ -41,35 +44,35 @@ public class TaskContext {
this.taskAttemptID = id;
}
public Class getInputKeyClass() {
public Class<?> getInputKeyClass() {
return iKClass;
}
public void setInputKeyClass(Class klass) {
public void setInputKeyClass(Class<?> klass) {
this.iKClass = klass;
}
public Class getInputValueClass() {
public Class<?> getInputValueClass() {
return iVClass;
}
public void setInputValueClass(Class klass) {
public void setInputValueClass(Class<?> klass) {
this.iVClass = klass;
}
public Class getOuputKeyClass() {
public Class<?> getOutputKeyClass() {
return this.oKClass;
}
public void setOutputKeyClass(Class klass) {
public void setOutputKeyClass(Class<?> klass) {
this.oKClass = klass;
}
public Class getOutputValueClass() {
public Class<?> getOutputValueClass() {
return this.oVClass;
}
public void setOutputValueClass(Class klass) {
public void setOutputValueClass(Class<?> klass) {
this.oVClass = klass;
}

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.mapred.nativetask.buffer;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public enum BufferType {
DIRECT_BUFFER,
HEAP_BUFFER
};

View File

@ -17,22 +17,17 @@
*/
package org.apache.hadoop.mapred.nativetask.buffer;
import com.google.common.base.Charsets;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* read data from a input buffer
*/
@InterfaceAudience.Private
public class ByteBufferDataReader extends DataInputStream {
private ByteBuffer byteBuffer;
private char lineCache[];
private java.io.DataInputStream javaReader;
public ByteBufferDataReader(InputBuffer buffer) {
@ -130,6 +125,7 @@ public class ByteBufferDataReader extends DataInputStream {
return byteBuffer.getDouble();
}
@SuppressWarnings("deprecation")
@Override
public String readLine() throws IOException {
return javaReader.readLine();

View File

@ -17,23 +17,21 @@
*/
package org.apache.hadoop.mapred.nativetask.buffer;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedInteger;
import com.google.common.primitives.UnsignedInts;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
import com.google.common.base.Preconditions;
/**
* DataOutputStream implementation which buffers data in a fixed-size
* ByteBuffer.
* When the byte buffer has filled up, synchronously passes the buffer
* to a downstream NativeDataTarget.
*/
@InterfaceAudience.Private
public class ByteBufferDataWriter extends DataOutputStream {
private final ByteBuffer buffer;
private final NativeDataTarget target;

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.mapred.nativetask.buffer;
import java.io.DataInput;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class DataInputStream extends InputStream implements DataInput {
public abstract boolean hasUnReadData();
}

View File

@ -21,22 +21,19 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
public abstract class DataOutputStream extends OutputStream implements DataOutput {
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class DataOutputStream extends OutputStream implements DataOutput {
/**
* Check whether this buffer has enough space to store length of bytes
*
* @param length
* , length of bytes
* @return
* @throws IOException
* @param length length of bytes
*/
public abstract boolean shortOfSpace(int length) throws IOException;
/**
* Check whether there is unflushed data stored in the stream
*
* @return
*/
public abstract boolean hasUnFlushedData();
}

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.mapred.nativetask.buffer;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@InterfaceAudience.Private
public class InputBuffer implements Closeable {
static DirectBufferPool bufferPool = new DirectBufferPool();

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.nativetask.buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class OutputBuffer {
protected ByteBuffer byteBuffer;
private final BufferType type;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.mapred.nativetask.handlers;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.nativetask.Constants;
import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
@ -30,9 +31,10 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
/**
* load data into a buffer signaled by a {@link BufferPuller}
*/
@InterfaceAudience.Private
public class BufferPullee<IK, IV> implements IDataLoader {
public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
public static final int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
private final SizedWritable<IK> tmpInputKey;
private final SizedWritable<IV> tmpInputValue;

View File

@ -1,10 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.handlers;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.nativetask.Constants;
@ -13,13 +32,12 @@ import org.apache.hadoop.mapred.nativetask.NativeDataSource;
import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.util.Progress;
/**
* actively signal a {@link BufferPullee} to load data into buffer and receive
*/
@InterfaceAudience.Private
public class BufferPuller implements RawKeyValueIterator, DataReceiver {
private static Log LOG = LogFactory.getLog(BufferPuller.class);
@ -108,8 +126,8 @@ public class BufferPuller implements RawKeyValueIterator, DataReceiver {
valueBytes = new byte[valueLength];
}
nativeReader.read(keyBytes, 0, keyLength);
nativeReader.read(valueBytes, 0, valueLength);
IOUtils.readFully(nativeReader, keyBytes, 0, keyLength);
IOUtils.readFully(nativeReader, valueBytes, 0, valueLength);
keyBuffer.reset(keyBytes, keyLength);
valueBuffer.reset(valueBytes, valueLength);

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.nativetask.Constants;
import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
@ -34,6 +35,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
/**
* collect data when signaled
*/
@InterfaceAudience.Private
public class BufferPushee<OK, OV> implements Closeable {
private static Log LOG = LogFactory.getLog(BufferPushee.class);

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
/**
* actively push data into a buffer and signal a {@link BufferPushee} to collect it
*/
@InterfaceAudience.Private
public class BufferPusher<K, V> implements OutputCollector<K, V> {
private static Log LOG = LogFactory.getLog(BufferPusher.class);

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.mapred.nativetask.handlers;
import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
import java.io.IOException;
import org.apache.commons.logging.Log;
@ -37,13 +35,13 @@ import org.apache.hadoop.mapred.nativetask.TaskContext;
import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
public static String NAME = "NativeTask.CombineHandler";
class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
public static final String NAME = "NativeTask.CombineHandler";
private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
public static Command LOAD = new Command(1, "Load");
public static Command COMBINE = new Command(4, "Combine");
public static final Command LOAD = new Command(1, "Load");
public static final Command COMBINE = new Command(4, "Combine");
public final CombinerRunner<K, V> combinerRunner;
private final INativeHandler nativeHandler;
@ -66,13 +64,16 @@ public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher
LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz);
}
final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS);
final Counter combineInputCounter = context.getTaskReporter().getCounter(
TaskCounter.COMBINE_INPUT_RECORDS);
final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(),
combineInputCounter, context.getTaskReporter(), null);
final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT);
final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(),
@SuppressWarnings("unchecked")
final BufferPusher<K, V> pusher = new BufferPusher<K, V>((Class<K>)context.getInputKeyClass(),
(Class<V>)context.getInputValueClass(),
nativeHandler);
final BufferPuller puller = new BufferPuller(nativeHandler);
return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher);

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.mapred.nativetask.handlers;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* an IDataLoader loads data on demand
*/
@InterfaceAudience.Private
public interface IDataLoader {
/**

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskAttemptID;
@ -41,14 +42,19 @@ import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
* Java Record Reader + Java Mapper + Native Collector
*/
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable {
public static String NAME = "NativeTask.MCollectorOutputHandler";
public static final String NAME = "NativeTask.MCollectorOutputHandler";
private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH");
public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH");
public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH");
public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER");
public static final Command GET_OUTPUT_PATH =
new Command(100, "GET_OUTPUT_PATH");
public static final Command GET_OUTPUT_INDEX_PATH =
new Command(101, "GET_OUTPUT_INDEX_PATH");
public static final Command GET_SPILL_PATH =
new Command(102, "GET_SPILL_PATH");
public static final Command GET_COMBINE_HANDLER =
new Command(103, "GET_COMBINE_HANDLER");
private NativeTaskOutput output;
private int spillNumber = 0;
@ -63,7 +69,7 @@ public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Clos
ICombineHandler combinerHandler = null;
try {
final TaskContext combineContext = context.copyOf();
combineContext.setInputKeyClass(context.getOuputKeyClass());
combineContext.setInputKeyClass(context.getOutputKeyClass());
combineContext.setInputValueClass(context.getOutputValueClass());
combinerHandler = CombinerHandler.create(combineContext);
@ -76,7 +82,9 @@ public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Clos
}
final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT);
final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(),
final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(
(Class<K>)context.getOutputKeyClass(),
(Class<V>)context.getOutputValueClass(),
nativeHandler);
return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler);

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class BoolWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class ByteWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -22,9 +22,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> {
@Override

View File

@ -24,8 +24,12 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Writable;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultSerializer implements INativeSerializer<Writable> {
static class ModifiedByteArrayOutputStream extends ByteArrayOutputStream {

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class DoubleWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class FloatWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
@ -27,6 +28,7 @@ import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
/**
* serializes key-value pair
*/
@InterfaceAudience.Private
public interface IKVSerializer {
/**
@ -35,7 +37,7 @@ public interface IKVSerializer {
* @param value
* @throws IOException
*/
public void updateLength(SizedWritable key, SizedWritable value) throws IOException;
public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IOException;
/**
*
@ -45,7 +47,8 @@ public interface IKVSerializer {
* @return bytes written
* @throws IOException
*/
public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException;
public int serializeKV(DataOutputStream out, SizedWritable<?> key,
SizedWritable<?> value) throws IOException;
/**
* serialize partitionId as well
@ -56,7 +59,8 @@ public interface IKVSerializer {
* @return
* @throws IOException
*/
public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value)
public int serializePartitionKV(DataOutputStream out, int partitionId,
SizedWritable<?> key, SizedWritable<?> value)
throws IOException;
/**
@ -67,5 +71,5 @@ public interface IKVSerializer {
* @return bytes read
* @throws IOException
*/
public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException;
public int deserializeKV(DataInputStream in, SizedWritable<?> key, SizedWritable<?> value) throws IOException;
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* an INativeSerializer serializes and deserializes data transferred between
@ -30,6 +32,8 @@ import java.io.IOException;
* you have to make sure the native side can serialize it correctly.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface INativeSerializer<T> {
/**

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class IntWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -22,18 +22,21 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.Constants;
import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
public class KVSerializer<K, V> implements IKVSerializer {
@InterfaceAudience.Private
public class KVSerializer<K, V> implements IKVSerializer {
private static final Log LOG = LogFactory.getLog(KVSerializer.class);
public static int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH;
public static final int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH;
private final INativeSerializer<Writable> keySerializer;
private final INativeSerializer<Writable> valueSerializer;
@ -45,19 +48,20 @@ public class KVSerializer<K, V> implements IKVSerializer {
}
@Override
public void updateLength(SizedWritable key, SizedWritable value) throws IOException {
public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IOException {
key.length = keySerializer.getLength(key.v);
value.length = valueSerializer.getLength(value.v);
return;
}
@Override
public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException {
public int serializeKV(DataOutputStream out, SizedWritable<?> key, SizedWritable<?> value) throws IOException {
return serializePartitionKV(out, -1, key, value);
}
@Override
public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value)
public int serializePartitionKV(DataOutputStream out, int partitionId,
SizedWritable<?> key, SizedWritable<?> value)
throws IOException {
if (key.length == SizedWritable.INVALID_LENGTH || value.length == SizedWritable.INVALID_LENGTH) {
@ -90,7 +94,8 @@ public class KVSerializer<K, V> implements IKVSerializer {
}
@Override
public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException {
public int deserializeKV(DataInputStream in, SizedWritable<?> key,
SizedWritable<?> value) throws IOException {
if (!in.hasUnReadData()) {
return 0;

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class LongWritableSerializer extends DefaultSerializer implements
INativeComparable {
@Override

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
public class NativeSerialization {
private final ConcurrentHashMap<String, Class<?>> map = new ConcurrentHashMap<String, Class<?>>();

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapred.nativetask.serde;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class NullWritableSerializer extends DefaultSerializer implements
INativeComparable {

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapred.nativetask.serde;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public enum SerializationFramework {
WRITABLE_SERIALIZATION(0), NATIVE_SERIALIZATION(1);

View File

@ -22,9 +22,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class TextSerializer implements INativeSerializer<Text>, INativeComparable {
public TextSerializer() throws SecurityException, NoSuchMethodException {

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.mapred.nativetask.serde;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class VIntWritableSerializer extends DefaultSerializer implements
INativeComparable {
}

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.mapred.nativetask.serde;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class VLongWritableSerializer extends DefaultSerializer implements
INativeComparable {
}

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.nativetask.util;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class BytesUtil {
private static final char[] HEX_CHARS =

View File

@ -24,7 +24,10 @@ import java.util.Map;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configuration;
public class ConfigUtil {
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public abstract class ConfigUtil {
public static byte[][] toBytes(Configuration conf) {
List<byte[]> nativeConfigs = new ArrayList<byte[]>();
for (Map.Entry<String, String> e : conf) {

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.mapred.nativetask.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskID;
@InterfaceAudience.Private
public class LocalJobOutputFiles implements NativeTaskOutput {
static final String TASKTRACKER_OUTPUT = "output";

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.mapred.nativetask.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskID;
@ -26,6 +27,7 @@ import org.apache.hadoop.mapred.TaskID;
/**
* base class of output files manager.
*/
@InterfaceAudience.Private
public interface NativeTaskOutput {
/**

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.mapred.nativetask.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@ -33,7 +34,7 @@ import org.apache.hadoop.mapred.TaskID;
* mapreduce.cluster.local.dir as taskTracker/jobCache/jobId/attemptId This class should not be used
* from TaskTracker space.
*/
@InterfaceAudience.Private
public class NativeTaskOutputFiles implements NativeTaskOutput {
static final String TASKTRACKER_OUTPUT = "output";

View File

@ -22,9 +22,10 @@ import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
public class OutputUtil {
private static Log LOG = LogFactory.getLog(OutputUtil.class);
@ -40,7 +41,7 @@ public class OutputUtil {
NativeTaskOutput instance = (NativeTaskOutput) ctor.newInstance(conf, id);
return instance;
} catch (Exception e) {
return null;
throw new RuntimeException(e);
}
}
}

View File

@ -15,16 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.nativetask.util;
import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public class ReadWriteBuffer {
private byte[] _buff;
private int _writePoint;
private int _readPoint;
final int CACHE_LINE_SIZE = 16;
final static int CACHE_LINE_SIZE = 16;
public ReadWriteBuffer(int length) {
if (length > 0) {

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.mapred.nativetask.util;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.Private
public class SizedWritable<T> {
public static int INVALID_LENGTH = -1;
public static final int INVALID_LENGTH = -1;
public int length = INVALID_LENGTH;
public Writable v;

View File

@ -30,13 +30,13 @@ public class TestTaskContext extends TestCase {
TaskContext context = new TaskContext(null, null, null, null, null, null, null);
context.setInputKeyClass(IntWritable.class);
assertEquals(IntWritable.class.getName(), context.getInputKeyClass().getName());
assertEquals(IntWritable.class.getName(), context.getInputKeyClass().getName());
context.setInputValueClass(Text.class);
assertEquals(Text.class.getName(), context.getInputValueClass().getName());
context.setOutputKeyClass(LongWritable.class);
assertEquals(LongWritable.class.getName(), context.getOuputKeyClass().getName());
assertEquals(LongWritable.class.getName(), context.getOutputKeyClass().getName());
context.setOutputValueClass(FloatWritable.class);
assertEquals(FloatWritable.class.getName(), context.getOutputValueClass().getName());