From f2933f253588968318fb338474b57cd51526d2a9 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Thu, 15 May 2014 00:29:41 +0000 Subject: [PATCH] HDFS-6287. Add vecsum test of libhdfs read access times (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594751 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop-hdfs/src/CMakeLists.txt | 12 + .../hadoop-hdfs/src/config.h.cmake | 2 + .../src/main/native/libhdfs/test/vecsum.c | 804 ++++++++++++++++++ .../native/libhdfs/test_libhdfs_threaded.c | 2 +- 5 files changed, 821 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/vecsum.c diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 82024f4daae..f1fc5dfc9f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -361,6 +361,8 @@ Release 2.5.0 - UNRELEASED HDFS-2949. Add check to active state transition to prevent operator-induced split brain. (Rushabh S Shah via kihwal) + HDFS-6287. Add vecsum test of libhdfs read access times (cmccabe) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt index 82d1a324f05..fc5ebea4c0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt @@ -62,6 +62,9 @@ endfunction() INCLUDE(CheckCSourceCompiles) CHECK_C_SOURCE_COMPILES("int main(void) { static __thread int i = 0; return 0; }" HAVE_BETTER_TLS) +# Check to see if we have Intel SSE intrinsics. +CHECK_C_SOURCE_COMPILES("#include \nint main(void) { __m128d sum0 = _mm_set_pd(0.0,0.0); return 0; }" HAVE_INTEL_SSE_INTRINSICS) + # Check if we need to link dl library to get dlopen. # dlopen on Linux is in separate library but on FreeBSD its in libc INCLUDE(CheckLibraryExists) @@ -170,6 +173,15 @@ target_link_libraries(test_libhdfs_zerocopy pthread ) +add_executable(test_libhdfs_vecsum + main/native/libhdfs/test/vecsum.c +) +target_link_libraries(test_libhdfs_vecsum + hdfs + pthread + rt +) + IF(REQUIRE_LIBWEBHDFS) add_subdirectory(contrib/libwebhdfs) ENDIF(REQUIRE_LIBWEBHDFS) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/config.h.cmake b/hadoop-hdfs-project/hadoop-hdfs/src/config.h.cmake index ac0b5308cc3..0d11fc4623d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/config.h.cmake +++ b/hadoop-hdfs-project/hadoop-hdfs/src/config.h.cmake @@ -22,4 +22,6 @@ #cmakedefine HAVE_BETTER_TLS +#cmakedefine HAVE_INTEL_SSE_INTRINSICS + #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/vecsum.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/vecsum.c new file mode 100644 index 00000000000..93d205ec780 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/vecsum.c @@ -0,0 +1,804 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "hdfs.h" + +#define VECSUM_CHUNK_SIZE (8 * 1024 * 1024) +#define ZCR_READ_CHUNK_SIZE (1024 * 1024 * 8) +#define NORMAL_READ_CHUNK_SIZE (8 * 1024 * 1024) +#define DOUBLES_PER_LOOP_ITER 16 + +static double timespec_to_double(const struct timespec *ts) +{ + double sec = ts->tv_sec; + double nsec = ts->tv_nsec; + return sec + (nsec / 1000000000L); +} + +struct stopwatch { + struct timespec start; + struct timespec stop; + struct rusage rusage; +}; + +static struct stopwatch *stopwatch_create(void) +{ + struct stopwatch *watch; + + watch = calloc(1, sizeof(struct stopwatch)); + if (!watch) { + fprintf(stderr, "failed to allocate memory for stopwatch\n"); + goto error; + } + if (clock_gettime(CLOCK_MONOTONIC, &watch->start)) { + int err = errno; + fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with " + "error %d (%s)\n", err, strerror(err)); + goto error; + } + if (getrusage(RUSAGE_THREAD, &watch->rusage) < 0) { + int err = errno; + fprintf(stderr, "getrusage failed: error %d (%s)\n", + err, strerror(err)); + goto error; + } + return watch; + +error: + free(watch); + return NULL; +} + +static void stopwatch_stop(struct stopwatch *watch, + long long bytes_read) +{ + double elapsed, rate; + + if (clock_gettime(CLOCK_MONOTONIC, &watch->stop)) { + int err = errno; + fprintf(stderr, "clock_gettime(CLOCK_MONOTONIC) failed with " + "error %d (%s)\n", err, strerror(err)); + goto done; + } + elapsed = timespec_to_double(&watch->stop) - + timespec_to_double(&watch->start); + rate = (bytes_read / elapsed) / (1024 * 1024 * 1024); + printf("stopwatch: took %.5g seconds to read %lld bytes, " + "for %.5g GB/s\n", elapsed, bytes_read, rate); + printf("stopwatch: %.5g seconds\n", elapsed); +done: + free(watch); +} + +enum vecsum_type { + VECSUM_LOCAL = 0, + VECSUM_LIBHDFS, + VECSUM_ZCR, +}; + +#define VECSUM_TYPE_VALID_VALUES "libhdfs, zcr, or local" + +int parse_vecsum_type(const char *str) +{ + if (strcasecmp(str, "local") == 0) + return VECSUM_LOCAL; + else if (strcasecmp(str, "libhdfs") == 0) + return VECSUM_LIBHDFS; + else if (strcasecmp(str, "zcr") == 0) + return VECSUM_ZCR; + else + return -1; +} + +struct options { + // The path to read. + const char *path; + + // Length of the file. + long long length; + + // The number of times to read the path. + int passes; + + // Type of vecsum to do + enum vecsum_type ty; + + // RPC address to use for HDFS + const char *rpc_address; +}; + +static struct options *options_create(void) +{ + struct options *opts = NULL; + const char *pass_str; + const char *ty_str; + const char *length_str; + int ty; + + opts = calloc(1, sizeof(struct options)); + if (!opts) { + fprintf(stderr, "failed to calloc options\n"); + goto error; + } + opts->path = getenv("VECSUM_PATH"); + if (!opts->path) { + fprintf(stderr, "You must set the VECSUM_PATH environment " + "variable to the path of the file to read.\n"); + goto error; + } + length_str = getenv("VECSUM_LENGTH"); + if (!length_str) { + length_str = "2147483648"; + } + opts->length = atoll(length_str); + if (!opts->length) { + fprintf(stderr, "Can't parse VECSUM_LENGTH of '%s'.\n", + length_str); + goto error; + } + if (opts->length % VECSUM_CHUNK_SIZE) { + fprintf(stderr, "VECSUM_LENGTH must be a multiple of '%lld'. The " + "currently specified length of '%lld' is not.\n", + (long long)VECSUM_CHUNK_SIZE, (long long)opts->length); + goto error; + } + pass_str = getenv("VECSUM_PASSES"); + if (!pass_str) { + fprintf(stderr, "You must set the VECSUM_PASSES environment " + "variable to the number of passes to make.\n"); + goto error; + } + opts->passes = atoi(pass_str); + if (opts->passes <= 0) { + fprintf(stderr, "Invalid value for the VECSUM_PASSES " + "environment variable. You must set this to a " + "number greater than 0.\n"); + goto error; + } + ty_str = getenv("VECSUM_TYPE"); + if (!ty_str) { + fprintf(stderr, "You must set the VECSUM_TYPE environment " + "variable to " VECSUM_TYPE_VALID_VALUES "\n"); + goto error; + } + ty = parse_vecsum_type(ty_str); + if (ty < 0) { + fprintf(stderr, "Invalid VECSUM_TYPE environment variable. " + "Valid values are " VECSUM_TYPE_VALID_VALUES "\n"); + goto error; + } + opts->ty = ty; + opts->rpc_address = getenv("VECSUM_RPC_ADDRESS"); + if (!opts->rpc_address) { + opts->rpc_address = "default"; + } + return opts; +error: + free(opts); + return NULL; +} + +static int test_file_chunk_setup(double **chunk) +{ + int i; + double *c, val; + + c = malloc(VECSUM_CHUNK_SIZE); + if (!c) { + fprintf(stderr, "test_file_create: failed to malloc " + "a buffer of size '%lld'\n", + (long long) VECSUM_CHUNK_SIZE); + return EIO; + } + val = 0.0; + for (i = 0; i < VECSUM_CHUNK_SIZE / sizeof(double); i++) { + c[i] = val; + val += 0.5; + } + *chunk = c; + return 0; +} + +static void options_free(struct options *opts) +{ + free(opts); +} + +struct local_data { + int fd; + double *mmap; + long long length; +}; + +static int local_data_create_file(struct local_data *cdata, + const struct options *opts) +{ + int ret = EIO; + int dup_fd = -1; + FILE *fp = NULL; + double *chunk = NULL; + long long offset = 0; + + dup_fd = dup(cdata->fd); + if (dup_fd < 0) { + ret = errno; + fprintf(stderr, "local_data_create_file: dup failed: %s (%d)\n", + strerror(ret), ret); + goto done; + } + fp = fdopen(dup_fd, "w"); + if (!fp) { + ret = errno; + fprintf(stderr, "local_data_create_file: fdopen failed: %s (%d)\n", + strerror(ret), ret); + goto done; + } + ret = test_file_chunk_setup(&chunk); + if (ret) + goto done; + while (offset < opts->length) { + if (fwrite(chunk, VECSUM_CHUNK_SIZE, 1, fp) != 1) { + fprintf(stderr, "local_data_create_file: failed to write to " + "the local file '%s' at offset %lld\n", + opts->path, offset); + ret = EIO; + goto done; + } + offset += VECSUM_CHUNK_SIZE; + } + fprintf(stderr, "local_data_create_file: successfully re-wrote %s as " + "a file of length %lld\n", opts->path, opts->length); + ret = 0; + +done: + if (dup_fd >= 0) { + close(dup_fd); + } + if (fp) { + fclose(fp); + } + free(chunk); + return ret; +} + +static struct local_data *local_data_create(const struct options *opts) +{ + struct local_data *cdata = NULL; + struct stat st_buf; + + cdata = malloc(sizeof(*cdata)); + if (!cdata) { + fprintf(stderr, "Failed to allocate local test data.\n"); + goto error; + } + cdata->fd = -1; + cdata->mmap = MAP_FAILED; + cdata->length = opts->length; + + cdata->fd = open(opts->path, O_RDWR | O_CREAT, 0777); + if (cdata->fd < 0) { + int err = errno; + fprintf(stderr, "local_data_create: failed to open %s " + "for read/write: error %d (%s)\n", opts->path, err, strerror(err)); + goto error; + } + if (fstat(cdata->fd, &st_buf)) { + int err = errno; + fprintf(stderr, "local_data_create: fstat(%s) failed: " + "error %d (%s)\n", opts->path, err, strerror(err)); + goto error; + } + if (st_buf.st_size != opts->length) { + int err; + fprintf(stderr, "local_data_create: current size of %s is %lld, but " + "we want %lld. Re-writing the file.\n", + opts->path, (long long)st_buf.st_size, + (long long)opts->length); + err = local_data_create_file(cdata, opts); + if (err) + goto error; + } + cdata->mmap = mmap(NULL, cdata->length, PROT_READ, + MAP_PRIVATE, cdata->fd, 0); + if (cdata->mmap == MAP_FAILED) { + int err = errno; + fprintf(stderr, "local_data_create: mmap(%s) failed: " + "error %d (%s)\n", opts->path, err, strerror(err)); + goto error; + } + return cdata; + +error: + if (cdata) { + if (cdata->fd >= 0) { + close(cdata->fd); + } + free(cdata); + } + return NULL; +} + +static void local_data_free(struct local_data *cdata) +{ + close(cdata->fd); + munmap(cdata->mmap, cdata->length); +} + +struct libhdfs_data { + hdfsFS fs; + hdfsFile file; + long long length; + double *buf; +}; + +static void libhdfs_data_free(struct libhdfs_data *ldata) +{ + if (ldata->fs) { + free(ldata->buf); + if (ldata->file) { + hdfsCloseFile(ldata->fs, ldata->file); + } + hdfsDisconnect(ldata->fs); + } + free(ldata); +} + +static int libhdfs_data_create_file(struct libhdfs_data *ldata, + const struct options *opts) +{ + int ret; + double *chunk = NULL; + long long offset = 0; + + ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_WRONLY, 0, 1, 0); + if (!ldata->file) { + ret = errno; + fprintf(stderr, "libhdfs_data_create_file: hdfsOpenFile(%s, " + "O_WRONLY) failed: error %d (%s)\n", opts->path, ret, + strerror(ret)); + goto done; + } + ret = test_file_chunk_setup(&chunk); + if (ret) + goto done; + while (offset < opts->length) { + ret = hdfsWrite(ldata->fs, ldata->file, chunk, VECSUM_CHUNK_SIZE); + if (ret < 0) { + ret = errno; + fprintf(stderr, "libhdfs_data_create_file: got error %d (%s) at " + "offset %lld of %s\n", ret, strerror(ret), + offset, opts->path); + goto done; + } else if (ret < VECSUM_CHUNK_SIZE) { + fprintf(stderr, "libhdfs_data_create_file: got short write " + "of %d at offset %lld of %s\n", ret, offset, opts->path); + goto done; + } + offset += VECSUM_CHUNK_SIZE; + } + ret = 0; +done: + free(chunk); + if (ldata->file) { + if (hdfsCloseFile(ldata->fs, ldata->file)) { + fprintf(stderr, "libhdfs_data_create_file: hdfsCloseFile error."); + ret = EIO; + } + ldata->file = NULL; + } + return ret; +} + +static struct libhdfs_data *libhdfs_data_create(const struct options *opts) +{ + struct libhdfs_data *ldata = NULL; + struct hdfsBuilder *builder = NULL; + hdfsFileInfo *pinfo = NULL; + + ldata = calloc(1, sizeof(struct libhdfs_data)); + if (!ldata) { + fprintf(stderr, "Failed to allocate libhdfs test data.\n"); + goto error; + } + builder = hdfsNewBuilder(); + if (!builder) { + fprintf(stderr, "Failed to create builder.\n"); + goto error; + } + hdfsBuilderSetNameNode(builder, opts->rpc_address); + hdfsBuilderConfSetStr(builder, + "dfs.client.read.shortcircuit.skip.checksum", "true"); + ldata->fs = hdfsBuilderConnect(builder); + if (!ldata->fs) { + fprintf(stderr, "Could not connect to default namenode!\n"); + goto error; + } + pinfo = hdfsGetPathInfo(ldata->fs, opts->path); + if (!pinfo) { + int err = errno; + fprintf(stderr, "hdfsGetPathInfo(%s) failed: error %d (%s). " + "Attempting to re-create file.\n", + opts->path, err, strerror(err)); + if (libhdfs_data_create_file(ldata, opts)) + goto error; + } else if (pinfo->mSize != opts->length) { + fprintf(stderr, "hdfsGetPathInfo(%s) failed: length was %lld, " + "but we want length %lld. Attempting to re-create file.\n", + opts->path, (long long)pinfo->mSize, (long long)opts->length); + if (libhdfs_data_create_file(ldata, opts)) + goto error; + } + ldata->file = hdfsOpenFile(ldata->fs, opts->path, O_RDONLY, 0, 0, 0); + if (!ldata->file) { + int err = errno; + fprintf(stderr, "hdfsOpenFile(%s) failed: error %d (%s)\n", + opts->path, err, strerror(err)); + goto error; + } + ldata->length = opts->length; + return ldata; + +error: + if (pinfo) + hdfsFreeFileInfo(pinfo, 1); + if (ldata) + libhdfs_data_free(ldata); + return NULL; +} + +static int check_byte_size(int byte_size, const char *const str) +{ + if (byte_size % sizeof(double)) { + fprintf(stderr, "%s is not a multiple " + "of sizeof(double)\n", str); + return EINVAL; + } + if ((byte_size / sizeof(double)) % DOUBLES_PER_LOOP_ITER) { + fprintf(stderr, "The number of doubles contained in " + "%s is not a multiple of DOUBLES_PER_LOOP_ITER\n", + str); + return EINVAL; + } + return 0; +} + +#ifdef HAVE_INTEL_SSE_INTRINSICS + +#include + +static double vecsum(const double *buf, int num_doubles) +{ + int i; + double hi, lo; + __m128d x0, x1, x2, x3, x4, x5, x6, x7; + __m128d sum0 = _mm_set_pd(0.0,0.0); + __m128d sum1 = _mm_set_pd(0.0,0.0); + __m128d sum2 = _mm_set_pd(0.0,0.0); + __m128d sum3 = _mm_set_pd(0.0,0.0); + __m128d sum4 = _mm_set_pd(0.0,0.0); + __m128d sum5 = _mm_set_pd(0.0,0.0); + __m128d sum6 = _mm_set_pd(0.0,0.0); + __m128d sum7 = _mm_set_pd(0.0,0.0); + for (i = 0; i < num_doubles; i+=DOUBLES_PER_LOOP_ITER) { + x0 = _mm_load_pd(buf + i + 0); + x1 = _mm_load_pd(buf + i + 2); + x2 = _mm_load_pd(buf + i + 4); + x3 = _mm_load_pd(buf + i + 6); + x4 = _mm_load_pd(buf + i + 8); + x5 = _mm_load_pd(buf + i + 10); + x6 = _mm_load_pd(buf + i + 12); + x7 = _mm_load_pd(buf + i + 14); + sum0 = _mm_add_pd(sum0, x0); + sum1 = _mm_add_pd(sum1, x1); + sum2 = _mm_add_pd(sum2, x2); + sum3 = _mm_add_pd(sum3, x3); + sum4 = _mm_add_pd(sum4, x4); + sum5 = _mm_add_pd(sum5, x5); + sum6 = _mm_add_pd(sum6, x6); + sum7 = _mm_add_pd(sum7, x7); + } + x0 = _mm_add_pd(sum0, sum1); + x1 = _mm_add_pd(sum2, sum3); + x2 = _mm_add_pd(sum4, sum5); + x3 = _mm_add_pd(sum6, sum7); + x4 = _mm_add_pd(x0, x1); + x5 = _mm_add_pd(x2, x3); + x6 = _mm_add_pd(x4, x5); + _mm_storeh_pd(&hi, x6); + _mm_storel_pd(&lo, x6); + return hi + lo; +} + +#else + +static double vecsum(const double *buf, int num_doubles) +{ + int i; + double sum = 0.0; + for (i = 0; i < num_doubles; i++) { + sum += buf[i]; + } + return sum; +} + +#endif + +static int vecsum_zcr_loop(int pass, struct libhdfs_data *ldata, + struct hadoopRzOptions *zopts, + const struct options *opts) +{ + int32_t len; + double sum = 0.0; + const double *buf; + struct hadoopRzBuffer *rzbuf = NULL; + int ret; + + while (1) { + rzbuf = hadoopReadZero(ldata->file, zopts, ZCR_READ_CHUNK_SIZE); + if (!rzbuf) { + ret = errno; + fprintf(stderr, "hadoopReadZero failed with error " + "code %d (%s)\n", ret, strerror(ret)); + goto done; + } + buf = hadoopRzBufferGet(rzbuf); + if (!buf) break; + len = hadoopRzBufferLength(rzbuf); + if (len < ZCR_READ_CHUNK_SIZE) { + fprintf(stderr, "hadoopReadZero got a partial read " + "of length %d\n", len); + ret = EINVAL; + goto done; + } + sum += vecsum(buf, + ZCR_READ_CHUNK_SIZE / sizeof(double)); + hadoopRzBufferFree(ldata->file, rzbuf); + } + printf("finished zcr pass %d. sum = %g\n", pass, sum); + ret = 0; + +done: + if (rzbuf) + hadoopRzBufferFree(ldata->file, rzbuf); + return ret; +} + +static int vecsum_zcr(struct libhdfs_data *ldata, + const struct options *opts) +{ + int ret, pass; + struct hadoopRzOptions *zopts = NULL; + + zopts = hadoopRzOptionsAlloc(); + if (!zopts) { + fprintf(stderr, "hadoopRzOptionsAlloc failed.\n"); + ret = ENOMEM; + goto done; + } + if (hadoopRzOptionsSetSkipChecksum(zopts, 1)) { + ret = errno; + perror("hadoopRzOptionsSetSkipChecksum failed: "); + goto done; + } + if (hadoopRzOptionsSetByteBufferPool(zopts, NULL)) { + ret = errno; + perror("hadoopRzOptionsSetByteBufferPool failed: "); + goto done; + } + for (pass = 0; pass < opts->passes; ++pass) { + ret = vecsum_zcr_loop(pass, ldata, zopts, opts); + if (ret) { + fprintf(stderr, "vecsum_zcr_loop pass %d failed " + "with error %d\n", pass, ret); + goto done; + } + hdfsSeek(ldata->fs, ldata->file, 0); + } + ret = 0; +done: + if (zopts) + hadoopRzOptionsFree(zopts); + return ret; +} + +tSize hdfsReadFully(hdfsFS fs, hdfsFile f, void* buffer, tSize length) +{ + uint8_t *buf = buffer; + tSize ret, nread = 0; + + while (length > 0) { + ret = hdfsRead(fs, f, buf, length); + if (ret < 0) { + if (errno != EINTR) { + return -1; + } + } + if (ret == 0) { + break; + } + nread += ret; + length -= ret; + buf += ret; + } + return nread; +} + +static int vecsum_normal_loop(int pass, const struct libhdfs_data *ldata, + const struct options *opts) +{ + double sum = 0.0; + + while (1) { + int res = hdfsReadFully(ldata->fs, ldata->file, ldata->buf, + NORMAL_READ_CHUNK_SIZE); + if (res == 0) // EOF + break; + if (res < 0) { + int err = errno; + fprintf(stderr, "hdfsRead failed with error %d (%s)\n", + err, strerror(err)); + return err; + } + if (res < NORMAL_READ_CHUNK_SIZE) { + fprintf(stderr, "hdfsRead got a partial read of " + "length %d\n", res); + return EINVAL; + } + sum += vecsum(ldata->buf, + NORMAL_READ_CHUNK_SIZE / sizeof(double)); + } + printf("finished normal pass %d. sum = %g\n", pass, sum); + return 0; +} + +static int vecsum_libhdfs(struct libhdfs_data *ldata, + const struct options *opts) +{ + int pass; + + ldata->buf = malloc(NORMAL_READ_CHUNK_SIZE); + if (!ldata->buf) { + fprintf(stderr, "failed to malloc buffer of size %d\n", + NORMAL_READ_CHUNK_SIZE); + return ENOMEM; + } + for (pass = 0; pass < opts->passes; ++pass) { + int ret = vecsum_normal_loop(pass, ldata, opts); + if (ret) { + fprintf(stderr, "vecsum_normal_loop pass %d failed " + "with error %d\n", pass, ret); + return ret; + } + hdfsSeek(ldata->fs, ldata->file, 0); + } + return 0; +} + +static void vecsum_local(struct local_data *cdata, const struct options *opts) +{ + int pass; + + for (pass = 0; pass < opts->passes; pass++) { + double sum = vecsum(cdata->mmap, cdata->length / sizeof(double)); + printf("finished vecsum_local pass %d. sum = %g\n", pass, sum); + } +} + +static long long vecsum_length(const struct options *opts, + const struct libhdfs_data *ldata) +{ + if (opts->ty == VECSUM_LOCAL) { + struct stat st_buf = { 0 }; + if (stat(opts->path, &st_buf)) { + int err = errno; + fprintf(stderr, "vecsum_length: stat(%s) failed: " + "error %d (%s)\n", opts->path, err, strerror(err)); + return -EIO; + } + return st_buf.st_size; + } else { + return ldata->length; + } +} + +/* + * vecsum is a microbenchmark which measures the speed of various ways of + * reading from HDFS. It creates a file containing floating-point 'doubles', + * and computes the sum of all the doubles several times. For some CPUs, + * assembly optimizations are used for the summation (SSE, etc). + */ +int main(void) +{ + int ret = 1; + struct options *opts = NULL; + struct local_data *cdata = NULL; + struct libhdfs_data *ldata = NULL; + struct stopwatch *watch = NULL; + + if (check_byte_size(VECSUM_CHUNK_SIZE, "VECSUM_CHUNK_SIZE") || + check_byte_size(ZCR_READ_CHUNK_SIZE, + "ZCR_READ_CHUNK_SIZE") || + check_byte_size(NORMAL_READ_CHUNK_SIZE, + "NORMAL_READ_CHUNK_SIZE")) { + goto done; + } + opts = options_create(); + if (!opts) + goto done; + if (opts->ty == VECSUM_LOCAL) { + cdata = local_data_create(opts); + if (!cdata) + goto done; + } else { + ldata = libhdfs_data_create(opts); + if (!ldata) + goto done; + } + watch = stopwatch_create(); + if (!watch) + goto done; + switch (opts->ty) { + case VECSUM_LOCAL: + vecsum_local(cdata, opts); + ret = 0; + break; + case VECSUM_LIBHDFS: + ret = vecsum_libhdfs(ldata, opts); + break; + case VECSUM_ZCR: + ret = vecsum_zcr(ldata, opts); + break; + } + if (ret) { + fprintf(stderr, "vecsum failed with error %d\n", ret); + goto done; + } + ret = 0; +done: + fprintf(stderr, "cleaning up...\n"); + if (watch && (ret == 0)) { + long long length = vecsum_length(opts, ldata); + if (length >= 0) { + stopwatch_stop(watch, length * opts->passes); + } + } + if (cdata) + local_data_free(cdata); + if (ldata) + libhdfs_data_free(ldata); + if (opts) + options_free(opts); + return ret; +} + +// vim: ts=4:sw=4:tw=79:et diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c index 3a8f31dccac..c4ea060ec7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c @@ -122,7 +122,7 @@ struct tlhPaths { static int setupPaths(const struct tlhThreadInfo *ti, struct tlhPaths *paths) { - memset(paths, sizeof(*paths), 0); + memset(paths, 0, sizeof(*paths)); if (snprintf(paths->prefix, sizeof(paths->prefix), "/tlhData%04d", ti->threadIdx) >= sizeof(paths->prefix)) { return ENAMETOOLONG;