fis-gtm/sr_port/repl_comm.c

414 lines
15 KiB
C

/****************************************************************
* *
* Copyright 2001, 2013 Fidelity Information Services, Inc *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#include "mdef.h"
#include <sys/types.h>
#include <sys/time.h>
#include <errno.h>
#ifdef USE_POLL
# include <sys/poll.h>
#endif
#include "gtm_stdio.h"
#include "gtm_string.h"
#include "gtm_socket.h"
#include "gtm_inet.h"
#include "gtm_fcntl.h"
#include "gtm_unistd.h"
#include "gtm_stat.h"
#include "gtmio.h"
#include "repl_msg.h"
#include "repl_errno.h"
#include "repl_dbg.h"
#include "iosp.h"
#include "repl_comm.h"
#include "repl_sp.h"
#include "min_max.h"
#include "rel_quant.h"
#include "repl_log.h"
#include "iotcpdef.h"
#include "gtmmsg.h"
#include "gt_timer.h"
/* These statistics are useful and should perhaps be collected - Vinaya 2003/08/18
*
* Common:
*
* Send:
* # calls to repl_send
* # calls to repl_send with unaligned buffer
* # bytes repl_send called with, distribute into buckets
* % of input bytes repl_send actually sent, or # bytes actually sent distributed into buckets
* # calls to select, and timeout distributed into buckets
* # calls to send
* # calls to select that were interrupted (EINTR)
* # calls to select that were unsuccessful due to system resource shortage (EAGAIN)
* # calls to select that timed out
* # calls to send that were interrupted (EINTR)
* # calls to send that failed due to the message size being too big (EMSGSIZE)
* # calls to send that would have blocked (EWOULDBLOCK)
*
* Receive:
* # calls to repl_recv
* # calls to repl_recv with unaligned buffer
* # bytes repl_recv called with, distribute into buckets
* % of input length repl_recv actually received, or # bytes actuall received distributed into buckets
* # calls to select, and timeout distributed into buckets
* # calls to recv
* # calls to select that were interrupted (EINTR)
* # calls to select that were unsuccessful due to system resource shortage (EAGAIN)
* # calls to select that timed out
* # calls to recv that were interrupted (EINTR)
* # calls to recv that failed due to the connection reset (bytes received == 0)
* # calls to recv that would have blocked (EWOULDBLOCK)
*/
GBLDEF int repl_max_send_buffsize, repl_max_recv_buffsize;
#if defined(__hppa) || defined(__vms)
#define REPL_SEND_TRACE_BUFF_SIZE 65536
#define REPL_RECV_TRACE_BUFF_SIZE 65536
#else
#define REPL_SEND_TRACE_BUFF_SIZE 1048576
#define REPL_RECV_TRACE_BUFF_SIZE 1048576
#endif
#define REPL_SEND_SIZE_TRACE_SIZE 1024
#define REPL_RECV_SIZE_TRACE_SIZE 1024
STATICDEF int repl_send_trace_buff_pos = 0;
STATICDEF unsigned char * repl_send_trace_buff = 0;
STATICDEF int repl_send_size_trace_pos = 0;
STATICDEF int repl_send_size_trace[REPL_SEND_SIZE_TRACE_SIZE];
STATICDEF int repl_recv_trace_buff_pos = 0;
STATICDEF unsigned char * repl_recv_trace_buff = 0;
STATICDEF int repl_recv_size_trace_pos = 0;
STATICDEF int repl_recv_size_trace[REPL_RECV_SIZE_TRACE_SIZE];
error_def(ERR_GETADDRINFO);
error_def(ERR_GETNAMEINFO);
error_def(ERR_GETSOCKNAMERR);
error_def(ERR_TEXT);
#define REPL_TRACE_BUFF(TRACE_BUFF, TRACE_BUFF_POS, IO_BUFF, IO_SIZE, MAX_TRACE_SIZE) \
{ \
if (IO_SIZE > MAX_TRACE_SIZE) \
{ \
memcpy(TRACE_BUFF, IO_BUFF + IO_SIZE - MAX_TRACE_SIZE, MAX_TRACE_SIZE); \
TRACE_BUFF_POS = 0; \
} else \
{ \
int space_to_end = MAX_TRACE_SIZE - TRACE_BUFF_POS; \
if (IO_SIZE > space_to_end) \
{ \
memcpy(TRACE_BUFF + TRACE_BUFF_POS, IO_BUFF, space_to_end); \
memcpy(TRACE_BUFF, IO_BUFF + space_to_end, IO_SIZE - space_to_end); \
} else \
memcpy(TRACE_BUFF + TRACE_BUFF_POS, IO_BUFF, IO_SIZE); \
TRACE_BUFF_POS = (TRACE_BUFF_POS + IO_SIZE) % MAX_TRACE_SIZE; \
} \
}
int fd_ioready(int sock_fd, boolean_t pollin, int timeout)
{
int save_errno, status, EAGAIN_cnt = 0;
# ifdef USE_POLL
struct pollfd fds;
# else
fd_set fds, *readfds, *writefds;
struct timeval timeout_spec;
# endif
assert(timeout < MILLISECS_IN_SEC);
SELECT_ONLY(timeout = timeout * 1000); /* Convert to microseconds (~ 1sec) */
assert((timeout >= 0) && (timeout < POLL_ONLY(MILLISECS_IN_SEC) SELECT_ONLY(MICROSEC_IN_SEC)));
# ifdef USE_POLL
fds.fd = sock_fd;
fds.events = pollin ? POLLIN : POLLOUT;
# else
readfds = writefds = NULL;
timeout_spec.tv_sec = 0;
timeout_spec.tv_usec = timeout;
FD_ZERO(&fds);
FD_SET(sock_fd, &fds);
writefds = !pollin ? &fds : NULL;
readfds = pollin ? &fds : NULL;
# endif
POLL_ONLY(while (-1 == (status = poll(&fds, 1, timeout))))
SELECT_ONLY(while (-1 == (status = select(sock_fd + 1, readfds, writefds, NULL, &timeout_spec))))
{
save_errno = ERRNO;
if (EINTR == save_errno)
{ /* Give it another shot. But, halve the timeout so we don't keep doing this forever. */
timeout = timeout >> 1;
} else if (EAGAIN == save_errno)
{ /* Resource starved system; relinquish the processor in the hope that we may get the required resources
* next time around.
*/
if (0 == ++EAGAIN_cnt % REPL_COMM_LOG_EAGAIN_INTERVAL)
{
repl_log(stderr, TRUE, TRUE, "Communication subsytem warning: System appears to be resource "
"starved. EAGAIN returned from select()/poll() %d times\n", EAGAIN_cnt);
}
rel_quant();
} else
return -1;
/* Just in case select() modifies the incoming arguments, restore fd_set and timeout_spec */
SELECT_ONLY(
assert(0 == timeout_spec.tv_sec);
timeout_spec.tv_usec = timeout; /* Note: timeout is the reduced value (in case of EINTR) */
FD_SET(sock_fd, &fds);
)
}
return status;
}
int repl_send(int sock_fd, unsigned char *buff, int *send_len, int timeout)
{
int send_size, status, eintr_cnt, ewouldblock_cnt = 0, emsgsize_cnt = 0, io_ready, save_errno;
ssize_t bytes_sent;
if (!repl_send_trace_buff)
repl_send_trace_buff = malloc(REPL_SEND_TRACE_BUFF_SIZE);
/* Note: there is no corresponding free for this malloc since it is only done once per process and will not
* accumulate across multiple process invocations. It will be "freed" when the mupip process exits.
*/
assert(FD_INVALID != sock_fd);
send_size = *send_len;
/* VMS returns SYSTEM-F-INVBUFLEN if send_size is larger than the hard limit VMS_MAX_TCP_SEND_SIZE (64K - 1 on some
* impelementations, 64K - 512 on some others). VMS_MAX_TCP_SEND_SIZE may be larger than repl_max_send_buffsize, and
* empirically we have noticed send() successfully sending repl_max_send_buffsize or more bytes.
*/
VMS_ONLY(send_size = MIN(send_size, VMS_MAX_TCP_SEND_SIZE));
*send_len = 0;
if (0 < (io_ready = fd_ioready(sock_fd, FALSE, timeout)))
{
/* Trace last REPL_SEND_SIZE_TRACE_SIZE sizes of what was sent */
repl_send_size_trace[repl_send_size_trace_pos++] = send_size;
repl_send_size_trace_pos %= ARRAYSIZE(repl_send_size_trace);
/* Trace last REPL_SEND_TRACE_BUFF_SIZE bytes sent. */
assert(0 < send_size);
REPL_TRACE_BUFF(repl_send_trace_buff, repl_send_trace_buff_pos, buff, send_size, REPL_SEND_TRACE_BUFF_SIZE);
/* The check for EINTR below is valid and should not be converted to an EINTR wrapper macro, because other errno
* values are being checked.
*/
while (0 > (bytes_sent = send(sock_fd, (char *)buff, send_size, 0)))
{
save_errno = ERRNO;
assert((EMSGSIZE != save_errno) && (EWOULDBLOCK != save_errno));
if (EINTR == save_errno)
continue;
if (EMSGSIZE == save_errno)
{ /* Reduce the send size if possible */
if (send_size > REPL_COMM_MIN_SEND_SIZE)
{
if ((send_size >> 1) <= REPL_COMM_MIN_SEND_SIZE)
send_size = REPL_COMM_MIN_SEND_SIZE;
else
send_size >>= 1;
}
if (0 == ++emsgsize_cnt % REPL_COMM_LOG_EMSGSIZE_INTERVAL)
{
repl_log(stderr, TRUE, TRUE, "Communication subsystem warning: System appears to be "
"clogged; EMSGSIZE returned from send %d times\n", emsgsize_cnt);
}
} else if (EWOULDBLOCK == save_errno)
{
if (0 == ++ewouldblock_cnt % REPL_COMM_LOG_EWDBLCK_INTERVAL)
{
repl_log(stderr, TRUE, TRUE, "Communication subsystem warning: System appears to be "
"running slow; EWOULDBLOCK returned from send %d times\n", ewouldblock_cnt);
}
rel_quant(); /* Relinquish our quanta in the hope that things get cleared next time around */
} else
break;
}
if (0 <= bytes_sent)
{
*send_len = (int)bytes_sent;
REPL_DPRINT2("repl_send: returning with send_len %ld\n", bytes_sent);
return SS_NORMAL;
}
repl_errno = EREPL_SEND;
return save_errno;
} else if (!io_ready)
return SS_NORMAL;
save_errno = ERRNO;
repl_errno = EREPL_SELECT;
return save_errno;
}
int repl_recv(int sock_fd, unsigned char *buff, int *recv_len, int timeout)
{
int status, max_recv_len, eintr_cnt, eagain_cnt, io_ready, save_errno;
ssize_t bytes_recvd;
if (!repl_recv_trace_buff)
repl_recv_trace_buff = malloc(REPL_RECV_TRACE_BUFF_SIZE);
/* Note: there is no corresponding free for this malloc since it is only done once per process and will not
* accumulate across multiple process invocations. It will be "freed" when the mupip process exits.
*/
assert(FD_INVALID != sock_fd);
max_recv_len = *recv_len;
/* VMS returns SYSTEM-F-INVBUFLEN if send_size is larger than the hard limit VMS_MAX_TCP_RECV_SIZE (64K - 1 on some
* impelementations, 64K - 512 on some others). VMS_MAX_TCP_RECV_SIZE may be larger than repl_max_send_buffsize, and
* empirically we have noticed send() successfully sending repl_max_send_buffsize or more bytes.
*/
VMS_ONLY(max_recv_len = MIN(max_recv_len, VMS_MAX_TCP_RECV_SIZE));
*recv_len = 0;
if (0 < (io_ready = fd_ioready(sock_fd, TRUE, timeout)))
{
while (0 > (bytes_recvd = recv(sock_fd, (char *)buff, max_recv_len, 0)) && EINTR == ERRNO)
;
if (0 < bytes_recvd)
{
*recv_len = (int)bytes_recvd;
REPL_DPRINT2("repl_recv: returning with recv_len %ld\n", bytes_recvd);
/* Trace last REPL_RECV_SIZE_TRACE_SIZE sizes of what was received */
repl_recv_size_trace[repl_recv_size_trace_pos++] = bytes_recvd;
repl_recv_size_trace_pos %= ARRAYSIZE(repl_recv_size_trace);
/* Trace last REPL_RECV_TRACE_BUFF_SIZE bytes received. */
REPL_TRACE_BUFF(repl_recv_trace_buff, repl_recv_trace_buff_pos, buff, bytes_recvd,
REPL_RECV_TRACE_BUFF_SIZE);
return (SS_NORMAL); /* always process the received buffer before dealing with any errno */
}
save_errno = ERRNO;
if (0 == bytes_recvd) /* Connection reset */
save_errno = errno = ECONNRESET;
else if (ETIMEDOUT == save_errno)
{
repl_log(stderr, TRUE, TRUE, "Communication subsystem warning: network may be down;"
" socket recv() returned ETIMEDOUT\n");
} else if (EWOULDBLOCK == save_errno)
{ /* NOTE: Although we use blocking sockets, it is possible to get EWOULDBLOCK error status if receive timeout
* has been set and the timeout expired before data was received (from man recv on RH 8 Linux). Some systems
* return ETIMEDOUT for the timeout condition.
*/
assert(EWOULDBLOCK != save_errno);
repl_log(stderr, TRUE, TRUE, "Communication subsystem warning: network I/O failed to complete; "
"socket recv() returned EWOULDBLOCK\n");
save_errno = errno = ETIMEDOUT; /* will be treated as a bad connection and the connection closed */
}
repl_errno = EREPL_RECV;
return save_errno;
} else if (!io_ready)
return SS_NORMAL;
save_errno = ERRNO;
repl_errno = EREPL_SELECT;
return save_errno;
}
int repl_close(int *sock_fd)
{
int status = 0;
if (FD_INVALID != *sock_fd)
CLOSEFILE_RESET(*sock_fd, status); /* resets "*sock_fd" to FD_INVALID */
return (0 == status ? 0 : ERRNO);
}
static int get_sock_buff_size(int sockfd, int *buffsize, int which_buf)
{
int status;
GTM_SOCKLEN_TYPE optlen;
optlen = SIZEOF(*buffsize);
status = getsockopt(sockfd, SOL_SOCKET, which_buf, (void *)buffsize, (GTM_SOCKLEN_TYPE *)&optlen);
return (0 == status) ? 0 : ERRNO;
}
int get_send_sock_buff_size(int sockfd, int *send_buffsize)
{
return get_sock_buff_size(sockfd, send_buffsize, SO_SNDBUF);
}
int get_recv_sock_buff_size(int sockfd, int *recv_buffsize)
{
return get_sock_buff_size(sockfd, recv_buffsize, SO_RCVBUF);
}
static int set_sock_buff_size(int sockfd, int buflen, int which_buf)
{
int status;
# ifndef sun
size_t optlen;
# else
int optlen;
# endif
optlen = SIZEOF(buflen);
status = setsockopt(sockfd, SOL_SOCKET, which_buf, (void *)&buflen, (GTM_SOCKLEN_TYPE)optlen);
return (0 == status) ? 0 : ERRNO;
}
int set_send_sock_buff_size(int sockfd, int buflen)
{
return set_sock_buff_size(sockfd, buflen, SO_SNDBUF);
}
int set_recv_sock_buff_size(int sockfd, int buflen)
{
return set_sock_buff_size(sockfd, buflen, SO_RCVBUF);
}
void repl_log_conn_info(int sock_fd, FILE *log_fp)
{
struct sockaddr_storage local, remote;
struct sockaddr *local_sa_ptr, *remote_sa_ptr;
GTM_SOCKLEN_TYPE len;
int save_errno;
char *errptr, local_ip[SA_MAXLEN], remote_ip[SA_MAXLEN];
char port_buffer[NI_MAXSERV];
char local_port_buffer[NI_MAXSERV], remote_port_buffer[NI_MAXSERV];
int errcode;
len = SIZEOF(local);
local_sa_ptr = (struct sockaddr *)&local;
remote_sa_ptr = (struct sockaddr *)&remote;
if (0 == getsockname(sock_fd, local_sa_ptr, (GTM_SOCKLEN_TYPE *)&len))
{
/* translate internal address to numeric ip address */
GETNAMEINFO(local_sa_ptr, len, local_ip, SA_MAXLEN, local_port_buffer, NI_MAXSERV, NI_NUMERICSERV, errcode);
if (0 != errcode)
{
repl_log(log_fp, TRUE, TRUE, "Error getting local name info: %s\n", gai_strerror(errcode));
strcpy(local_port_buffer, "*UNKNOWN*");
strcpy(local_ip, "*UNKNOWN*");
}
} else
{
save_errno = errno;
errptr = (char *)STRERROR(save_errno);
repl_log(log_fp, TRUE, TRUE, "Error getting local name: %s\n", errptr);
strcpy(local_port_buffer, "*UNKNOWN*");
strcpy(local_ip, "*UNKNOWN*");
}
len = SIZEOF(remote);
if (0 == getpeername(sock_fd, remote_sa_ptr, (GTM_SOCKLEN_TYPE *)&len))
{
GETNAMEINFO(remote_sa_ptr, len, remote_ip, SA_MAXLEN, remote_port_buffer, NI_MAXSERV, NI_NUMERICSERV, errcode);
if (0 != errcode)
{
repl_log(log_fp, TRUE, TRUE, "Error getting remote name info: %s\n", gai_strerror(errcode));
strcpy(remote_port_buffer, "*UNKNOWN*");
strcpy(remote_ip, "*UNKNOWN*");
}
} else
{
save_errno = errno;
errptr = (char *)STRERROR(save_errno);
repl_log(log_fp, TRUE, TRUE, "Error getting remote name: %s\n", errptr);
strcpy(remote_port_buffer, "*UNKNOWN*");
strcpy(remote_ip, "*UNKNOWN*");
}
repl_log(log_fp, TRUE, TRUE, "Connection information:: Local: %s:%s Remote: %s:%s\n", local_ip, local_port_buffer,
remote_ip, remote_port_buffer);
return;
}