MAPREDUCE-6005. native-task: Fix some valgrind errors (Binglin Chang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-2841@1615489 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
78d86a9839
commit
83a396733e
|
@ -8,3 +8,4 @@ MAPREDUCE-5997. native-task: Use DirectBufferPool from Hadoop Common (todd)
|
|||
MAPREDUCE-6000. native-task: Simplify ByteBufferDataReader/Writer (todd)
|
||||
MAPREDUCE-5991. native-task should not run unit tests if native profile is not enabled. (Binglin Chang)
|
||||
MAPREDUCE-5995. native-task: Revert changes to Text internals (todd)
|
||||
MAPREDUCE-6005. native-task: Fix some valgrind errors (Binglin Chang)
|
||||
|
|
|
@ -28,7 +28,6 @@ GzipCompressStream::GzipCompressStream(OutputStream * stream, uint32_t bufferSiz
|
|||
: CompressStream(stream), _compressedBytesWritten(0), _zstream(NULL), _finished(false) {
|
||||
_buffer = new char[bufferSizeHint];
|
||||
_capacity = bufferSizeHint;
|
||||
std::cout << "gzip capacity " << _capacity << std::endl;
|
||||
_zstream = malloc(sizeof(z_stream));
|
||||
z_stream * zstream = (z_stream*)_zstream;
|
||||
memset(zstream, 0, sizeof(z_stream));
|
||||
|
@ -44,6 +43,7 @@ GzipCompressStream::GzipCompressStream(OutputStream * stream, uint32_t bufferSiz
|
|||
|
||||
GzipCompressStream::~GzipCompressStream() {
|
||||
if (_zstream != NULL) {
|
||||
deflateEnd((z_stream*)_zstream);
|
||||
free(_zstream);
|
||||
_zstream = NULL;
|
||||
}
|
||||
|
@ -52,17 +52,13 @@ GzipCompressStream::~GzipCompressStream() {
|
|||
}
|
||||
|
||||
void GzipCompressStream::write(const void * buff, uint32_t length) {
|
||||
std::cout << "gzip " << length << std::endl;
|
||||
z_stream * zstream = (z_stream*)_zstream;
|
||||
zstream->next_in = (Bytef*)buff;
|
||||
zstream->avail_in = length;
|
||||
while (true) {
|
||||
int ret = deflate(zstream, Z_NO_FLUSH);
|
||||
std::cout << "gzip ret status " << ret << std::endl;
|
||||
if (ret == Z_OK) {
|
||||
std::cout << "gzip avail_out " << zstream->avail_out << std::endl;
|
||||
if (zstream->avail_out == 0) {
|
||||
std::cout << "gzip write capacity " << _capacity << std::endl;
|
||||
_stream->write(_buffer, _capacity);
|
||||
_compressedBytesWritten += _capacity;
|
||||
zstream->next_out = (Bytef *)_buffer;
|
||||
|
@ -79,7 +75,6 @@ void GzipCompressStream::write(const void * buff, uint32_t length) {
|
|||
}
|
||||
|
||||
void GzipCompressStream::flush() {
|
||||
std::cout << "gzip flush called";
|
||||
z_stream * zstream = (z_stream*)_zstream;
|
||||
while (true) {
|
||||
int ret = deflate(zstream, Z_FINISH);
|
||||
|
@ -111,7 +106,6 @@ void GzipCompressStream::resetState() {
|
|||
}
|
||||
|
||||
void GzipCompressStream::close() {
|
||||
std::cout << "gzip close called";
|
||||
if (!_finished) {
|
||||
flush();
|
||||
}
|
||||
|
@ -146,6 +140,7 @@ GzipDecompressStream::GzipDecompressStream(InputStream * stream, uint32_t buffer
|
|||
|
||||
GzipDecompressStream::~GzipDecompressStream() {
|
||||
if (_zstream != NULL) {
|
||||
inflateEnd((z_stream*)_zstream);
|
||||
free(_zstream);
|
||||
_zstream = NULL;
|
||||
}
|
||||
|
@ -173,11 +168,9 @@ int32_t GzipDecompressStream::read(void * buff, uint32_t length) {
|
|||
int ret = inflate(zstream, Z_NO_FLUSH);
|
||||
if (ret == Z_OK || ret == Z_STREAM_END) {
|
||||
if (zstream->avail_out == 0) {
|
||||
// printf("return %d\n", length);
|
||||
return length;
|
||||
}
|
||||
} else {
|
||||
// printf("Error: %d\n", ret);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -157,7 +157,7 @@ char * ReadBuffer::fillGet(uint32_t count) {
|
|||
_capacity = newcap;
|
||||
} else {
|
||||
if (_remain > 0) {
|
||||
memcpy(_buff, current(), _remain);
|
||||
memmove(_buff, current(), _remain);
|
||||
}
|
||||
}
|
||||
_size = _remain;
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <dirent.h>
|
||||
#include <sys/stat.h>
|
||||
#include <jni.h>
|
||||
#include "commons.h"
|
||||
#include "util/StringUtil.h"
|
||||
#include "jniutils.h"
|
||||
|
@ -34,13 +34,12 @@ namespace NativeTask {
|
|||
/////////////////////////////////////////////////////////////
|
||||
|
||||
FileInputStream::FileInputStream(const string & path) {
|
||||
_handle = fopen(path.c_str(), "rb");
|
||||
if (_handle != NULL) {
|
||||
_fd = fileno(_handle);
|
||||
_fd = ::open(path.c_str(), O_RDONLY);
|
||||
if (_fd >= 0) {
|
||||
_path = path;
|
||||
} else {
|
||||
_fd = -1;
|
||||
THROW_EXCEPTION_EX(IOException, "Can't open raw file: [%s]", path.c_str());
|
||||
THROW_EXCEPTION_EX(IOException, "Can't open file for read: [%s]", path.c_str());
|
||||
}
|
||||
_bytesRead = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
|
||||
TaskCounters::FILE_BYTES_READ);
|
||||
|
@ -67,9 +66,8 @@ int32_t FileInputStream::read(void * buff, uint32_t length) {
|
|||
}
|
||||
|
||||
void FileInputStream::close() {
|
||||
if (_handle != NULL) {
|
||||
fclose(_handle);
|
||||
_handle = NULL;
|
||||
if (_fd >= 0) {
|
||||
::close(_fd);
|
||||
_fd = -1;
|
||||
}
|
||||
}
|
||||
|
@ -77,13 +75,20 @@ void FileInputStream::close() {
|
|||
/////////////////////////////////////////////////////////////
|
||||
|
||||
FileOutputStream::FileOutputStream(const string & path, bool overwite) {
|
||||
_handle = fopen(path.c_str(), "wb");
|
||||
if (_handle != NULL) {
|
||||
_fd = fileno(_handle);
|
||||
int flags = 0;
|
||||
if (overwite) {
|
||||
flags = O_WRONLY | O_CREAT | O_TRUNC;
|
||||
} else {
|
||||
flags = O_WRONLY | O_CREAT | O_EXCL;
|
||||
}
|
||||
mode_t mask = umask(0);
|
||||
umask(mask);
|
||||
_fd = ::open(path.c_str(), flags, (0666 & ~mask));
|
||||
if (_fd >= 0) {
|
||||
_path = path;
|
||||
} else {
|
||||
_fd = -1;
|
||||
THROW_EXCEPTION_EX(IOException, "Open raw file failed: [%s]", path.c_str());
|
||||
THROW_EXCEPTION_EX(IOException, "Can't open file for write: [%s]", path.c_str());
|
||||
}
|
||||
_bytesWrite = NativeObjectFactory::GetCounter(TaskCounters::FILESYSTEM_COUNTER_GROUP,
|
||||
TaskCounters::FILE_BYTES_WRITTEN);
|
||||
|
@ -108,9 +113,8 @@ void FileOutputStream::flush() {
|
|||
}
|
||||
|
||||
void FileOutputStream::close() {
|
||||
if (_handle != NULL) {
|
||||
fclose(_handle);
|
||||
_handle = NULL;
|
||||
if (_fd >= 0) {
|
||||
::close(_fd);
|
||||
_fd = -1;
|
||||
}
|
||||
}
|
||||
|
@ -251,28 +255,8 @@ extern RawFileSystem RawFileSystemInstance;
|
|||
|
||||
RawFileSystem RawFileSystemInstance = RawFileSystem();
|
||||
|
||||
string FileSystem::getDefaultFsUri(Config * config) {
|
||||
const char * nm = config->get(FS_DEFAULT_NAME);
|
||||
if (nm == NULL) {
|
||||
nm = config->get("fs.defaultFS");
|
||||
}
|
||||
if (nm == NULL) {
|
||||
return string("file:///");
|
||||
} else {
|
||||
return string(nm);
|
||||
}
|
||||
}
|
||||
|
||||
FileSystem & FileSystem::getLocal() {
|
||||
return RawFileSystemInstance;
|
||||
}
|
||||
|
||||
|
||||
FileSystem & FileSystem::get(Config * config) {
|
||||
string uri = getDefaultFsUri(config);
|
||||
if (uri == "file:///") {
|
||||
return RawFileSystemInstance;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace Hadoap
|
||||
|
|
|
@ -34,7 +34,6 @@ class FileSystem;
|
|||
class FileInputStream : public InputStream {
|
||||
private:
|
||||
string _path;
|
||||
FILE * _handle;
|
||||
int _fd;
|
||||
Counter * _bytesRead;
|
||||
public:
|
||||
|
@ -57,7 +56,6 @@ public:
|
|||
class FileOutputStream : public OutputStream {
|
||||
private:
|
||||
string _path;
|
||||
FILE * _handle;
|
||||
int _fd;
|
||||
Counter * _bytesWrite;
|
||||
public:
|
||||
|
@ -117,10 +115,7 @@ public:
|
|||
virtual void mkdirs(const string & path) {
|
||||
}
|
||||
|
||||
static string getDefaultFsUri(Config * config);
|
||||
static FileSystem & getLocal();
|
||||
static FileSystem & getJava(Config * config);
|
||||
static FileSystem & get(Config * config);
|
||||
};
|
||||
|
||||
} // namespace NativeTask
|
||||
|
|
|
@ -69,6 +69,10 @@ private:
|
|||
public:
|
||||
MemoryBlock(char * pos, uint32_t size);
|
||||
|
||||
char * base() {
|
||||
return _base;
|
||||
}
|
||||
|
||||
bool sorted() {
|
||||
return _sorted;
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ Config * NativeObjectFactory::GlobalConfig = &G_CONFIG;
|
|||
float NativeObjectFactory::LastProgress = 0;
|
||||
Progress * NativeObjectFactory::TaskProgress = NULL;
|
||||
string NativeObjectFactory::LastStatus;
|
||||
set<Counter *> NativeObjectFactory::CounterSet;
|
||||
set<Counter *, CounterPtrCompare> NativeObjectFactory::CounterSet;
|
||||
vector<Counter *> NativeObjectFactory::Counters;
|
||||
vector<uint64_t> NativeObjectFactory::CounterLastUpdateValues;
|
||||
bool NativeObjectFactory::Inited = false;
|
||||
|
|
|
@ -36,6 +36,19 @@ using std::pair;
|
|||
|
||||
class NativeLibrary;
|
||||
|
||||
class CounterPtrCompare {
|
||||
public:
|
||||
bool operator()(const Counter * lhs, const Counter * rhs) const {
|
||||
if (lhs->group() < rhs->group()) {
|
||||
return true;
|
||||
} else if (lhs->group() == rhs->group()) {
|
||||
return lhs->name() < rhs->name();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Native object factory
|
||||
*/
|
||||
|
@ -47,7 +60,7 @@ private:
|
|||
static float LastProgress;
|
||||
static Progress * TaskProgress;
|
||||
static string LastStatus;
|
||||
static set<Counter *> CounterSet;
|
||||
static set<Counter *, CounterPtrCompare> CounterSet;
|
||||
static vector<Counter *> Counters;
|
||||
static vector<uint64_t> CounterLastUpdateValues;
|
||||
static bool Inited;
|
||||
|
|
|
@ -43,9 +43,11 @@ PartitionBucketIterator::PartitionBucketIterator(PartitionBucket * pb, Comparato
|
|||
MemBlockIteratorPtr blockIterator = new MemBlockIterator(block);
|
||||
if (blockIterator->next()) {
|
||||
_heap.push_back(blockIterator);
|
||||
} else {
|
||||
delete blockIterator;
|
||||
}
|
||||
}
|
||||
if (_heap.size() > 0) {
|
||||
if (_heap.size() > 1) {
|
||||
makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator);
|
||||
}
|
||||
}
|
||||
|
@ -78,6 +80,9 @@ bool PartitionBucketIterator::next() {
|
|||
heapify(base, 1, cur_heap_size, _comparator);
|
||||
}
|
||||
} else { // no more, pop heap
|
||||
// after popHeap, the first element of heap will be removed
|
||||
// and replaced by other element, so it needs to be deleted
|
||||
delete _heap[0];
|
||||
MemBlockIteratorPtr * base = &(_heap[0]);
|
||||
popHeap(base, base + cur_heap_size, _comparator);
|
||||
_heap.pop_back();
|
||||
|
|
|
@ -61,22 +61,17 @@ string StringUtil::ToString(bool v) {
|
|||
}
|
||||
|
||||
string StringUtil::ToString(float v) {
|
||||
char tmp[32];
|
||||
snprintf(tmp, 32, "%f", v);
|
||||
return tmp;
|
||||
return Format("%f", v);
|
||||
}
|
||||
|
||||
string StringUtil::ToString(double v) {
|
||||
char tmp[32];
|
||||
snprintf(tmp, 32, "%lf", v);
|
||||
return tmp;
|
||||
return Format("%lf", v);
|
||||
}
|
||||
|
||||
string StringUtil::ToString(const void * v, uint32_t len) {
|
||||
string StringUtil::ToHexString(const void * v, uint32_t len) {
|
||||
string ret = string(len * 2, '0');
|
||||
for (uint32_t i = 0; i < len; i++) {
|
||||
ret[i] = (((uint8_t*)v)[i] >> 4) + '0';
|
||||
ret[i] = (((uint8_t*)v)[i] & 0xff) + '0';
|
||||
snprintf(&(ret[i*2]), 3, "%02x", ((char*)v)[i]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -110,7 +105,7 @@ string StringUtil::Format(const char * fmt, ...) {
|
|||
len = vsnprintf(destbuff, len + 1, fmt, al);
|
||||
va_end(al);
|
||||
dest.append(destbuff, len);
|
||||
delete destbuff;
|
||||
delete [] destbuff;
|
||||
} else {
|
||||
dest.append(tmp, len);
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ public:
|
|||
static string ToString(bool v);
|
||||
static string ToString(float v);
|
||||
static string ToString(double v);
|
||||
static string ToString(const void * v, uint32_t len);
|
||||
static string ToHexString(const void * v, uint32_t len);
|
||||
|
||||
static int64_t toInt(const string & str);
|
||||
static bool toBool(const string & str);
|
||||
|
|
|
@ -297,7 +297,7 @@ void WritableUtils::toString(string & dest, KeyValueType type, const void * data
|
|||
dest.append(StringUtil::ToString(*(double*)data));
|
||||
break;
|
||||
case MD5HashType:
|
||||
dest.append(StringUtil::ToString(data, length));
|
||||
dest.append(StringUtil::ToHexString(data, length));
|
||||
break;
|
||||
default:
|
||||
dest.append((const char*)data, length);
|
||||
|
|
|
@ -25,14 +25,14 @@
|
|||
using namespace NativeTask;
|
||||
|
||||
TEST(Command, equals) {
|
||||
Command * cmd1 = new Command(100, "hello command");
|
||||
Command * cmd2 = new Command(100, "hello command 2");
|
||||
Command cmd1(100, "hello command");
|
||||
Command cmd2(100, "hello command 2");
|
||||
|
||||
ASSERT_TRUE(cmd1->equals(*cmd2));
|
||||
ASSERT_TRUE(cmd2->equals(*cmd1));
|
||||
ASSERT_EQ(100, cmd1->id());
|
||||
ASSERT_TRUE(cmd1.equals(cmd2));
|
||||
ASSERT_TRUE(cmd2.equals(cmd1));
|
||||
ASSERT_EQ(100, cmd1.id());
|
||||
|
||||
std::string helloCommand = "hello command";
|
||||
ASSERT_EQ(0, helloCommand.compare(cmd1->description()));
|
||||
ASSERT_EQ(helloCommand, cmd1.description());
|
||||
}
|
||||
|
||||
|
|
|
@ -17,11 +17,12 @@
|
|||
*/
|
||||
|
||||
#include "commons.h"
|
||||
#include "NativeObjectFactory.h"
|
||||
#include "BufferStream.h"
|
||||
#include "Buffers.h"
|
||||
#include "test_commons.h"
|
||||
|
||||
TEST(Counter, test) {
|
||||
TEST(Counter, Counter) {
|
||||
Counter counter1("group", "key");
|
||||
const string & group = counter1.group();
|
||||
const string & name = counter1.name();
|
||||
|
@ -33,3 +34,15 @@ TEST(Counter, test) {
|
|||
counter1.increase(100);
|
||||
ASSERT_EQ(100, counter1.get());
|
||||
}
|
||||
|
||||
TEST(Counter, CounterSet) {
|
||||
Counter * counter1 = NativeObjectFactory::GetCounter("group0", "name0");
|
||||
ASSERT_EQ(string("group0"), counter1->group());
|
||||
ASSERT_EQ(string("name0"), counter1->name());
|
||||
counter1->increase(100);
|
||||
ASSERT_EQ(100, counter1->get());
|
||||
Counter * counter2 = NativeObjectFactory::GetCounter("group0", "name0");
|
||||
Counter * counter3 = NativeObjectFactory::GetCounter("group0", "name1");
|
||||
ASSERT_EQ(counter1, counter2);
|
||||
ASSERT_NE(counter1, counter3);
|
||||
}
|
||||
|
|
|
@ -48,34 +48,3 @@ TEST(FileSystem, RawFileSystem) {
|
|||
ASSERT_FALSE(fs.exists(temppath));
|
||||
}
|
||||
|
||||
// This test needs java CLASSPATH env to run
|
||||
// Enable it manually only if some changes are made to FileSystem.h/cc
|
||||
//TEST(FileSystem, JavaFileSystem) {
|
||||
// FileSystem & fs = FileSystem::getJava(TestConfig);
|
||||
// fs.mkdirs("temp");
|
||||
// string temppath = "temp/data";
|
||||
// string content;
|
||||
// GenerateKVTextLength(content, 4111111, "word");
|
||||
// FileOutputStream * output = (FileOutputStream*)fs.create(temppath, true);
|
||||
// output->write(content.data(), content.length());
|
||||
// output->close();
|
||||
// delete output;
|
||||
// FileInputStream * input = (FileInputStream*)fs.open(temppath);
|
||||
// char buff[102400];
|
||||
// int64_t total = 0;
|
||||
// while(true) {
|
||||
// int rd = input->read(buff, 102400);
|
||||
// if (rd<=0) {
|
||||
// break;
|
||||
// }
|
||||
// ASSERT_EQ(content.substr(total, rd), string(buff,rd));
|
||||
// total+=rd;
|
||||
// }
|
||||
// ASSERT_EQ(content.length(), total);
|
||||
// delete input;
|
||||
// ASSERT_EQ(fs.getLength(temppath), content.length());
|
||||
// ASSERT_TRUE(fs.exists(temppath));
|
||||
// fs.remove("temp");
|
||||
// ASSERT_FALSE(fs.exists(temppath));
|
||||
//}
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
|
|||
vector<pair<string, string> > readkvs;
|
||||
readIFile(readkvs, outputpath, kvtype, info, codec);
|
||||
LOG("read finished");
|
||||
delete info;
|
||||
ASSERT_EQ(kvs.size() * partition, readkvs.size());
|
||||
for (int i = 0; i < partition; i++) {
|
||||
vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
|
||||
|
@ -171,11 +172,11 @@ TEST(IFile, TestGlibCBug) {
|
|||
LOG("TestGlibCBug %s", path.c_str());
|
||||
IFileSegment * segments = new IFileSegment[1];
|
||||
segments[0].realEndOffset = 10000000;
|
||||
SingleSpillInfo * info = new SingleSpillInfo(segments, 1, path, CHECKSUM_NONE,
|
||||
SingleSpillInfo info(segments, 1, path, CHECKSUM_NONE,
|
||||
IntType, TextType, "");
|
||||
|
||||
InputStream * fileOut = FileSystem::getLocal().open(path);
|
||||
IFileReader * reader = new IFileReader(fileOut, info, true);
|
||||
IFileReader * reader = new IFileReader(fileOut, &info, true);
|
||||
|
||||
const char * key = NULL;
|
||||
uint32_t length = 0;
|
||||
|
@ -187,4 +188,6 @@ TEST(IFile, TestGlibCBug) {
|
|||
ASSERT_EQ(expect[index], realKey);
|
||||
index++;
|
||||
}
|
||||
delete reader;
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "commons.h"
|
||||
#include "Buffers.h"
|
||||
#include "FileSystem.h"
|
||||
#include "NativeObjectFactory.h"
|
||||
#include "test_commons.h"
|
||||
|
||||
extern "C" {
|
||||
|
@ -75,6 +76,7 @@ int main(int argc, char ** argv) {
|
|||
int skip = gen ? 2 : 1;
|
||||
TestConfig.parse(argc - skip, (const char **)(newArgv + skip));
|
||||
}
|
||||
delete [] newArgv;
|
||||
try {
|
||||
if (gen == true) {
|
||||
string type = TestConfig.get("generate.type", "word");
|
||||
|
@ -92,12 +94,16 @@ int main(int argc, char ** argv) {
|
|||
fout->close();
|
||||
delete fout;
|
||||
}
|
||||
NativeObjectFactory::Release();
|
||||
return 0;
|
||||
} else {
|
||||
return RUN_ALL_TESTS();
|
||||
int ret = RUN_ALL_TESTS();
|
||||
NativeObjectFactory::Release();
|
||||
return ret;
|
||||
}
|
||||
} catch (std::exception & e) {
|
||||
fprintf(stderr, "Exception: %s", e.what());
|
||||
NativeObjectFactory::Release();
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,5 +42,6 @@ TEST(ByteBuffer, read) {
|
|||
ASSERT_EQ(20, byteBuffer.limit());
|
||||
|
||||
ASSERT_EQ(10, byteBuffer.position());
|
||||
delete [] buff;
|
||||
}
|
||||
} /* namespace NativeTask */
|
||||
|
|
|
@ -83,6 +83,7 @@ TEST(ComparatorForDualPivotQuickSort, compare) {
|
|||
compareResult = -1;
|
||||
|
||||
ASSERT_EQ(-1, comparator((char * )kv1 - buff, (char * )kv2 - buff));
|
||||
delete [] buff;
|
||||
}
|
||||
|
||||
} /* namespace NativeTask */
|
||||
|
|
|
@ -83,6 +83,7 @@ TEST(ComparatorForStdSort, compare) {
|
|||
compareResult = -1;
|
||||
|
||||
ASSERT_EQ(true, comparator((char * )kv1 - buff, (char * )kv2 - buff));
|
||||
delete [] buff;
|
||||
}
|
||||
|
||||
} /* namespace NativeTask */
|
||||
|
|
|
@ -47,6 +47,9 @@ TEST(FixSizeContainer, test) {
|
|||
char * c = container->base() + i;
|
||||
ASSERT_EQ(toBeFilled[i], *c);
|
||||
}
|
||||
|
||||
delete [] bytes;
|
||||
delete container;
|
||||
}
|
||||
|
||||
} /* namespace NativeTask */
|
||||
|
|
|
@ -24,28 +24,26 @@
|
|||
namespace NativeTask {
|
||||
|
||||
class MockIterator : public KVIterator {
|
||||
std::vector<std::pair<int, int> > * kvs;
|
||||
std::vector<std::pair<int, int> > kvs;
|
||||
uint32_t index;
|
||||
uint32_t expectedKeyGroupNum;
|
||||
std::map<int, int> expectkeyCountMap;
|
||||
char * buffer;
|
||||
char buffer[8];
|
||||
|
||||
public:
|
||||
MockIterator()
|
||||
: index(0), buffer(NULL) {
|
||||
buffer = new char[8];
|
||||
kvs = new std::vector<std::pair<int, int> >();
|
||||
kvs->push_back(std::pair<int, int>(10, 100));
|
||||
: index(0) {
|
||||
kvs.push_back(std::pair<int, int>(10, 100));
|
||||
|
||||
kvs->push_back(std::pair<int, int>(10, 100));
|
||||
kvs->push_back(std::pair<int, int>(10, 101));
|
||||
kvs->push_back(std::pair<int, int>(10, 102));
|
||||
kvs.push_back(std::pair<int, int>(10, 100));
|
||||
kvs.push_back(std::pair<int, int>(10, 101));
|
||||
kvs.push_back(std::pair<int, int>(10, 102));
|
||||
|
||||
kvs->push_back(std::pair<int, int>(20, 200));
|
||||
kvs->push_back(std::pair<int, int>(20, 201));
|
||||
kvs->push_back(std::pair<int, int>(20, 202));
|
||||
kvs->push_back(std::pair<int, int>(30, 302));
|
||||
kvs->push_back(std::pair<int, int>(40, 302));
|
||||
kvs.push_back(std::pair<int, int>(20, 200));
|
||||
kvs.push_back(std::pair<int, int>(20, 201));
|
||||
kvs.push_back(std::pair<int, int>(20, 202));
|
||||
kvs.push_back(std::pair<int, int>(30, 302));
|
||||
kvs.push_back(std::pair<int, int>(40, 302));
|
||||
this->expectedKeyGroupNum = 4;
|
||||
|
||||
expectkeyCountMap[10] = 4;
|
||||
|
@ -55,8 +53,8 @@ public:
|
|||
}
|
||||
|
||||
bool next(Buffer & key, Buffer & outValue) {
|
||||
if (index < kvs->size()) {
|
||||
std::pair<int, int> value = kvs->at(index);
|
||||
if (index < kvs.size()) {
|
||||
std::pair<int, int> value = kvs.at(index);
|
||||
*((int *)buffer) = value.first;
|
||||
*(((int *)buffer) + 1) = value.second;
|
||||
key.reset(buffer, 4);
|
||||
|
@ -88,11 +86,9 @@ void TestKeyGroupIterator() {
|
|||
uint32_t length = 0;
|
||||
key = groupIterator->getKey(length);
|
||||
int * keyPtr = (int *)key;
|
||||
std::cout << "new key group(key group hold kvs of same key): " << *keyPtr << std::endl;
|
||||
const char * value = NULL;
|
||||
while (NULL != (value = groupIterator->nextValue(length))) {
|
||||
int * valuePtr = (int *)value;
|
||||
std::cout << "==== key: " << *keyPtr << "value: " << *valuePtr << std::endl;
|
||||
|
||||
if (actualKeyCount.find(*keyPtr) == actualKeyCount.end()) {
|
||||
actualKeyCount[*keyPtr] = 0;
|
||||
|
@ -108,8 +104,8 @@ void TestKeyGroupIterator() {
|
|||
uint32_t expectedCount = expectedKeyCountMap[key];
|
||||
ASSERT_EQ(expectedCount, keyCountIter->second);
|
||||
}
|
||||
|
||||
std::cout << "Done!!!!!!! " << std::endl;
|
||||
delete groupIterator;
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST(Iterator, keyGroupIterator) {
|
||||
|
|
|
@ -48,6 +48,7 @@ TEST(KVBuffer, test) {
|
|||
|
||||
ASSERT_EQ(8, kv1->headerLength());
|
||||
ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->lengthConvertEndium());
|
||||
delete [] buff;
|
||||
}
|
||||
|
||||
} /* namespace NativeTask */
|
||||
|
|
|
@ -43,6 +43,7 @@ TEST(MemoryBlockIterator, test) {
|
|||
ASSERT_EQ(block.getKVBuffer(keyCount), kv);
|
||||
keyCount++;
|
||||
}
|
||||
delete [] bytes;
|
||||
}
|
||||
|
||||
class MemoryBlockFactory {
|
||||
|
@ -102,6 +103,13 @@ TEST(MemoryBlockIterator, compare) {
|
|||
|
||||
iter2->next();
|
||||
ASSERT_EQ(false, comparator(iter1, iter2));
|
||||
|
||||
delete iter2;
|
||||
delete iter1;
|
||||
delete [] block2->base();
|
||||
delete [] block1->base();
|
||||
delete block2;
|
||||
delete block1;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@ TEST(MemoryBlock, test) {
|
|||
|
||||
ASSERT_EQ(BUFFER_LENGTH - 2 * KV_SIZE, block.remainSpace());
|
||||
ASSERT_EQ(false, block.sorted());
|
||||
delete [] bytes;
|
||||
}
|
||||
|
||||
TEST(MemoryBlock, overflow) {
|
||||
|
@ -64,6 +65,7 @@ TEST(MemoryBlock, overflow) {
|
|||
ASSERT_EQ(1, block.getKVCount());
|
||||
|
||||
ASSERT_EQ(BUFFER_LENGTH - KV_SIZE, block.remainSpace());
|
||||
delete [] bytes;
|
||||
}
|
||||
|
||||
TEST(MemoryBlock, sort) {
|
||||
|
@ -101,6 +103,7 @@ TEST(MemoryBlock, sort) {
|
|||
ASSERT_EQ(small, block.getKVBuffer(0));
|
||||
ASSERT_EQ(medium, block.getKVBuffer(1));
|
||||
ASSERT_EQ(big, block.getKVBuffer(2));
|
||||
delete [] bytes;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -83,17 +83,28 @@ TEST(PartitionBucket, multipleMemoryBlock) {
|
|||
|
||||
const uint32_t KV_SIZE = 700;
|
||||
const uint32_t SMALL_KV_SIZE = 100;
|
||||
// To suppress valgrind error
|
||||
// the allocated buffer needs to be initialized before
|
||||
// create iterator on the PartitionBucker, because
|
||||
// those memory will be compared when create minheap
|
||||
KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
|
||||
memset(kv1, 0, KV_SIZE);
|
||||
KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
|
||||
memset(kv2, 0, SMALL_KV_SIZE);
|
||||
KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
|
||||
memset(kv3, 0, KV_SIZE);
|
||||
|
||||
ASSERT_EQ(3, bucket->getKVCount());
|
||||
KVIterator * NULLPOINTER = 0;
|
||||
ASSERT_NE(NULLPOINTER, bucket->getIterator());
|
||||
KVIterator * iter = bucket->getIterator();
|
||||
ASSERT_NE(NULLPOINTER, iter);
|
||||
delete iter;
|
||||
ASSERT_EQ(2, bucket->getMemoryBlockCount());
|
||||
|
||||
bucket->reset();
|
||||
ASSERT_EQ(NULLPOINTER, bucket->getIterator());
|
||||
iter = bucket->getIterator();
|
||||
ASSERT_EQ(NULLPOINTER, iter);
|
||||
delete iter;
|
||||
ASSERT_EQ(0, bucket->getMemoryBlockCount());
|
||||
|
||||
delete bucket;
|
||||
|
@ -206,6 +217,7 @@ TEST(PartitionBucket, spill) {
|
|||
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
|
||||
ASSERT_EQ(bswap(BIG), (*(uint32_t * )(third->getKey())));
|
||||
|
||||
delete [] buff;
|
||||
delete bucket;
|
||||
delete pool;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
TEST(Buffers, AppendRead) {
|
||||
string codec = "";
|
||||
vector<string> data;
|
||||
Generate(data, 1000000, "word");
|
||||
Generate(data, 100000, "word");
|
||||
string dest;
|
||||
dest.reserve(64 * 1024 * 1024);
|
||||
OutputStringStream outputStream = OutputStringStream(dest);
|
||||
|
@ -46,7 +46,7 @@ TEST(Buffers, AppendRead) {
|
|||
TEST(Buffers, AppendReadSnappy) {
|
||||
string codec = "org.apache.hadoop.io.compress.SnappyCodec";
|
||||
vector<string> data;
|
||||
Generate(data, 1000000, "word");
|
||||
Generate(data, 100000, "word");
|
||||
string dest;
|
||||
dest.reserve(64 * 1024 * 1024);
|
||||
OutputStringStream outputStream = OutputStringStream(dest);
|
||||
|
|
|
@ -42,21 +42,19 @@ TEST(ReadWriteBuffer, readAndWrite) {
|
|||
}
|
||||
|
||||
uint32_t writePoint = buff.getWritePoint();
|
||||
LOG("Current Write Point: %d", writePoint);
|
||||
|
||||
for (int i = 0; i < REPEAT; i++) {
|
||||
ASSERT_EQ(INT, buff.readInt());
|
||||
ASSERT_EQ(LONG, buff.readLong());
|
||||
string * read = buff.readString();
|
||||
LOG("READ STRING: %s", read->c_str());
|
||||
ASSERT_EQ(0, STR.compare(read->c_str()));
|
||||
delete read;
|
||||
|
||||
ASSERT_EQ(POINTER, buff.readPointer());
|
||||
|
||||
read = buff.readString();
|
||||
LOG("READ STRING: %s", read->c_str());
|
||||
ASSERT_EQ(0, STR.compare(read->c_str()));
|
||||
delete read;
|
||||
}
|
||||
|
||||
uint32_t readPoint = buff.getReadPoint();
|
||||
|
|
|
@ -27,6 +27,11 @@ TEST(StringUtil, Convertion) {
|
|||
ASSERT_EQ(StringUtil::ToString(111, ' ', 40), " 111");
|
||||
}
|
||||
|
||||
TEST(StringUtil, ToHexString) {
|
||||
uint8_t buff[4] = {'i', 'j', 'k', 'l'};
|
||||
ASSERT_EQ(StringUtil::ToHexString(buff, 4), string("696a6b6c"));
|
||||
}
|
||||
|
||||
TEST(StringUtil, Format) {
|
||||
string t = StringUtil::Format("%d %d %d %.3lf %s", 1, 2, 3, 1.333, "aaaaaaaaaaa");
|
||||
ASSERT_EQ(t, "1 2 3 1.333 aaaaaaaaaaa");
|
||||
|
|
Loading…
Reference in New Issue