fis-gtm/sr_unix/gtmsource_process_ops.c

1882 lines
92 KiB
C
Raw Normal View History

/****************************************************************
* *
* Copyright 2006, 2012 Fidelity Information Services, Inc *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#if defined(__MVS__) && !defined(_ISOC99_SOURCE)
#define _ISOC99_SOURCE
#endif
#include "mdef.h"
#include "gtm_stdio.h" /* for FILE * in repl_comm.h */
#include "gtm_socket.h"
#include "gtm_inet.h"
#include <sys/time.h>
#include <errno.h>
#include "gtm_fcntl.h"
#include "gtm_unistd.h"
#include "gtm_stat.h"
#include "gtm_string.h"
#ifdef VMS
#include <descrip.h> /* Required for gtmsource.h */
#endif
#include "gdsroot.h"
#include "gdsblk.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "filestruct.h"
#include "repl_msg.h"
#include "gtmsource.h"
#include "repl_comm.h"
#include "repl_shutdcode.h"
#include "jnl.h"
#include "hashtab_mname.h" /* needed for muprec.h */
#include "hashtab_int4.h" /* needed for muprec.h */
#include "hashtab_int8.h" /* needed for muprec.h */
#include "buddy_list.h"
#include "muprec.h"
#include "repl_ctl.h"
#include "repl_errno.h"
#include "repl_dbg.h"
#include "iosp.h"
#include "gtm_event_log.h"
#include "gt_timer.h"
#include "eintr_wrappers.h"
#include "repl_sp.h"
#include "repl_filter.h"
#include "repl_log.h"
#include "sgtm_putmsg.h"
#include "min_max.h"
#include "error.h"
#include "repl_instance.h"
#include "ftok_sems.h"
#include "gtmmsg.h"
#include "wbox_test_init.h"
#include "have_crit.h" /* needed for ZLIB_COMPRESS */
#include "deferred_signal_handler.h" /* needed for ZLIB_COMPRESS */
#include "gtm_zlib.h"
#include "replgbl.h"
#include "repl_inst_dump.h" /* for "repl_dump_histinfo" prototype */
GBLREF boolean_t gtmsource_logstats;
GBLREF boolean_t gtmsource_pool2file_transition;
GBLREF boolean_t gtmsource_received_cmp2uncmp_msg;
GBLREF boolean_t secondary_side_std_null_coll;
GBLREF FILE *gtmsource_log_fp;
GBLREF FILE *gtmsource_statslog_fp;
GBLREF gd_addr *gd_header;
GBLREF gtmsource_state_t gtmsource_state;
GBLREF int4 strm_index;
GBLREF int gtmsource_cmpmsgbufsiz;
GBLREF int gtmsource_filter;
GBLREF int gtmsource_log_fd;
GBLREF int gtmsource_msgbufsiz;
GBLREF int gtmsource_sock_fd;
GBLREF int gtmsource_statslog_fd;
GBLREF int repl_filter_bufsiz;
GBLREF int repl_max_send_buffsize, repl_max_recv_buffsize;
GBLREF jnlpool_addrs jnlpool;
GBLREF repl_conn_info_t *this_side, *remote_side;
GBLREF repl_ctl_element *repl_ctl_list;
GBLREF repl_msg_ptr_t gtmsource_cmpmsgp;
GBLREF repl_msg_ptr_t gtmsource_msgp;
GBLREF seq_num gtmsource_save_read_jnl_seqno;
GBLREF seq_num seq_num_zero;
GBLREF struct timeval gtmsource_poll_wait, gtmsource_poll_immediate;
GBLREF uchar_ptr_t repl_filter_buff;
GBLREF uint4 process_id;
GBLREF unsigned char *gtmsource_tcombuffp;
GBLREF unsigned char *gtmsource_tcombuff_start;
error_def(ERR_REPL2OLD);
error_def(ERR_REPLCOMM);
error_def(ERR_REPLFTOKSEM);
error_def(ERR_REPLINSTNOHIST);
error_def(ERR_REPLINSTSECMTCH);
error_def(ERR_REPLNOXENDIAN);
error_def(ERR_REPLWARN);
error_def(ERR_SECNOTSUPPLEMENTARY);
error_def(ERR_STRMNUMMISMTCH1);
error_def(ERR_STRMNUMMISMTCH2);
error_def(ERR_TEXT);
static unsigned char *tcombuff, *msgbuff, *cmpmsgbuff, *filterbuff;
void gtmsource_init_sec_addr(struct sockaddr_in *secondary_addr)
{
gtmsource_local_ptr_t gtmsource_local;
gtmsource_local = jnlpool.gtmsource_local;
memset((char *)secondary_addr, 0, SIZEOF(*secondary_addr));
(*secondary_addr).sin_family = AF_INET;
(*secondary_addr).sin_addr.s_addr = gtmsource_local->secondary_inet_addr;
(*secondary_addr).sin_port = htons(gtmsource_local->secondary_port);
}
int gtmsource_est_conn(struct sockaddr_in *secondary_addr)
{
int connection_attempts, alert_attempts, save_errno, status;
char print_msg[1024], msg_str[1024], *errmsg;
gtmsource_local_ptr_t gtmsource_local;
int send_buffsize, recv_buffsize, tcp_s_bufsize;
gtmsource_local = jnlpool.gtmsource_local;
assert(remote_side == &gtmsource_local->remote_side);
remote_side->proto_ver = REPL_PROTO_VER_UNINITIALIZED;
remote_side->endianness_known = FALSE;
/* Connect to the secondary - use hard tries, soft tries ... */
connection_attempts = 0;
gtmsource_comm_init();
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connect hard tries count = %d, Connect hard tries period = %d\n",
gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_COUNT],
gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_PERIOD]);
do
{
status = gtm_connect(gtmsource_sock_fd, (struct sockaddr *)secondary_addr, SIZEOF(*secondary_addr));
if (0 == status)
break;
repl_log(gtmsource_log_fp, FALSE, FALSE, "%d hard connection attempt failed : %s\n", connection_attempts + 1,
STRERROR(ERRNO));
repl_close(&gtmsource_sock_fd);
if (REPL_MAX_CONN_HARD_TRIES_PERIOD > jnlpool.gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_PERIOD])
SHORT_SLEEP(gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_PERIOD])
else
LONG_SLEEP(gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_PERIOD] % MILLISECS_IN_SEC);
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return (SS_NORMAL);
gtmsource_comm_init();
} while (++connection_attempts < gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_COUNT]);
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return (SS_NORMAL);
if (gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_COUNT] <= connection_attempts)
{
alert_attempts = DIVIDE_ROUND_DOWN(gtmsource_local->connect_parms[GTMSOURCE_CONN_ALERT_PERIOD],
gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD]);
repl_log(gtmsource_log_fp, TRUE, TRUE, "Soft tries period = %d, Alert period = %d\n",
gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD],
alert_attempts * gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD]);
connection_attempts = 0;
do
{
status = gtm_connect(gtmsource_sock_fd, (struct sockaddr *)secondary_addr, SIZEOF(*secondary_addr));
if (0 == status)
break;
repl_close(&gtmsource_sock_fd);
repl_log(gtmsource_log_fp, TRUE, TRUE, "%d soft connection attempt failed : %s\n",
connection_attempts + 1, STRERROR(ERRNO));
LONG_SLEEP(gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD]);
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return (SS_NORMAL);
gtmsource_comm_init();
connection_attempts++;
if (0 == connection_attempts % alert_attempts)
{ /* Log ALERT message */
SNPRINTF(msg_str, SIZEOF(msg_str),
"GTM Replication Source Server : Could not connect to secondary in %d seconds\n",
connection_attempts *
gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD]);
sgtm_putmsg(print_msg, VARLSTCNT(4) ERR_REPLWARN, 2, LEN_AND_STR(msg_str));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
gtm_event_log(GTM_EVENT_LOG_ARGC, "MUPIP", "REPLWARN", print_msg);
}
} while (TRUE);
}
if (0 != (status = get_send_sock_buff_size(gtmsource_sock_fd, &send_buffsize)))
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Error getting socket send buffsize : %s", STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
if (send_buffsize < GTMSOURCE_TCP_SEND_BUFSIZE)
{
for (tcp_s_bufsize = GTMSOURCE_TCP_SEND_BUFSIZE;
tcp_s_bufsize >= MAX(send_buffsize, GTMSOURCE_MIN_TCP_SEND_BUFSIZE)
&& 0 != (status = set_send_sock_buff_size(gtmsource_sock_fd, tcp_s_bufsize));
tcp_s_bufsize -= GTMSOURCE_TCP_SEND_BUFSIZE_INCR)
;
if (tcp_s_bufsize < GTMSOURCE_MIN_TCP_SEND_BUFSIZE)
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Could not set TCP send buffer size in range [%d, %d], last "
"known error : %s", GTMSOURCE_MIN_TCP_SEND_BUFSIZE, GTMSOURCE_TCP_SEND_BUFSIZE,
STRERROR(status));
rts_error(VARLSTCNT(6) MAKE_MSG_INFO(ERR_REPLCOMM), 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
}
if (0 != (status = get_send_sock_buff_size(gtmsource_sock_fd, &repl_max_send_buffsize))) /* may have changed */
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Error getting socket send buffsize : %s", STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
if (0 != (status = get_recv_sock_buff_size(gtmsource_sock_fd, &recv_buffsize)))
{
errmsg = STRERROR(status);
rts_error(VARLSTCNT(10) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_LIT("Error getting socket recv buffsize"),
ERR_TEXT, 2, RTS_ERROR_STRING(errmsg));
}
if (recv_buffsize < GTMSOURCE_TCP_RECV_BUFSIZE)
{
if (0 != (status = set_recv_sock_buff_size(gtmsource_sock_fd, GTMSOURCE_TCP_RECV_BUFSIZE)))
{
if (recv_buffsize < GTMSOURCE_MIN_TCP_RECV_BUFSIZE)
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Could not set TCP recv buffer size to %d : %s",
GTMSOURCE_MIN_TCP_RECV_BUFSIZE, STRERROR(status));
rts_error(VARLSTCNT(6) MAKE_MSG_INFO(ERR_REPLCOMM), 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
}
}
if (0 != (status = get_recv_sock_buff_size(gtmsource_sock_fd, &repl_max_recv_buffsize))) /* may have changed */
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Error getting socket recv buffsize : %s", STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connected to secondary, using TCP send buffer size %d receive buffer size %d\n",
repl_max_send_buffsize, repl_max_recv_buffsize);
repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp);
/* re-determine compression level on the replication pipe after every connection establishment */
gtmsource_local->repl_zlib_cmp_level = repl_zlib_cmp_level = ZLIB_CMPLVL_NONE;
/* reset any CMP2UNCMP messages received in prior connections. Once a connection encounters a REPL_CMP2UNCMP message
* all further replication on that connection will be uncompressed.
*/
gtmsource_received_cmp2uncmp_msg = FALSE;
return (SS_NORMAL);
}
int gtmsource_alloc_tcombuff(void)
{ /* Allocate buffer for TCOM, ZTCOM records */
if (NULL == tcombuff)
{
assert(NULL == gtmsource_tcombuff_start);
tcombuff = (unsigned char *)malloc(gd_header->n_regions * TCOM_RECLEN + OS_PAGE_SIZE);
gtmsource_tcombuff_start = (unsigned char *)ROUND_UP2((unsigned long)tcombuff, OS_PAGE_SIZE);
}
return (SS_NORMAL);
}
void gtmsource_free_tcombuff(void)
{
if (NULL != tcombuff)
{
free(tcombuff);
tcombuff = gtmsource_tcombuff_start = NULL;
}
return;
}
int gtmsource_alloc_filter_buff(int bufsiz)
{
unsigned char *old_filter_buff, *free_filter_buff;
bufsiz = ROUND_UP2(bufsiz, OS_PAGE_SIZE);
if (gtmsource_filter != NO_FILTER && bufsiz > repl_filter_bufsiz)
{
REPL_DPRINT3("Expanding filter buff from %d to %d\n", repl_filter_bufsiz, bufsiz);
free_filter_buff = filterbuff;
old_filter_buff = repl_filter_buff;
filterbuff = (unsigned char *)malloc(bufsiz + OS_PAGE_SIZE);
repl_filter_buff = (unsigned char *)ROUND_UP2((unsigned long)filterbuff, OS_PAGE_SIZE);
if (NULL != free_filter_buff)
{
assert(NULL != old_filter_buff);
memcpy(repl_filter_buff, old_filter_buff, repl_filter_bufsiz);
free(free_filter_buff);
}
repl_filter_bufsiz = bufsiz;
}
return (SS_NORMAL);
}
void gtmsource_free_filter_buff(void)
{
if (NULL != filterbuff)
{
assert(NULL != repl_filter_buff);
free(filterbuff);
filterbuff = repl_filter_buff = NULL;
repl_filter_bufsiz = 0;
}
}
int gtmsource_alloc_msgbuff(int maxbuffsize)
{ /* Allocate message buffer */
repl_msg_ptr_t oldmsgp;
unsigned char *free_msgp;
assert(MIN_REPL_MSGLEN < maxbuffsize);
maxbuffsize = ROUND_UP2(maxbuffsize, OS_PAGE_SIZE);
if ((maxbuffsize > gtmsource_msgbufsiz) || (NULL == gtmsource_msgp))
{
REPL_DPRINT3("Expanding message buff from %d to %d\n", gtmsource_msgbufsiz, maxbuffsize);
free_msgp = msgbuff;
oldmsgp = gtmsource_msgp;
msgbuff = (unsigned char *)malloc(maxbuffsize + OS_PAGE_SIZE);
gtmsource_msgp = (repl_msg_ptr_t)ROUND_UP2((unsigned long)msgbuff, OS_PAGE_SIZE);
if (NULL != free_msgp)
{ /* Copy existing data */
assert(NULL != oldmsgp);
memcpy((unsigned char *)gtmsource_msgp, (unsigned char *)oldmsgp, gtmsource_msgbufsiz);
free(free_msgp);
}
gtmsource_msgbufsiz = maxbuffsize;
if (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level)
{ /* Compression is enabled. Allocate parallel buffers to hold compressed journal records.
* Allocate extra space just in case compression actually expands the data (needed only in rare cases).
*/
free_msgp = cmpmsgbuff;
cmpmsgbuff = (unsigned char *)malloc((maxbuffsize * MAX_CMP_EXPAND_FACTOR) + OS_PAGE_SIZE);
gtmsource_cmpmsgp = (repl_msg_ptr_t)ROUND_UP2((unsigned long)cmpmsgbuff, OS_PAGE_SIZE);
if (NULL != free_msgp)
free(free_msgp);
gtmsource_cmpmsgbufsiz = (maxbuffsize * MAX_CMP_EXPAND_FACTOR);
}
gtmsource_alloc_filter_buff(gtmsource_msgbufsiz);
}
return (SS_NORMAL);
}
void gtmsource_free_msgbuff(void)
{
if (NULL != msgbuff)
{
assert(NULL != gtmsource_msgp);
free(msgbuff);
msgbuff = NULL;
gtmsource_msgp = NULL;
gtmsource_msgbufsiz = 0;
if (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level)
{ /* Compression is enabled. Free up compression buffer as well. */
assert(NULL != gtmsource_cmpmsgp);
free(cmpmsgbuff);
cmpmsgbuff = NULL;
gtmsource_cmpmsgp = NULL;
gtmsource_cmpmsgbufsiz = 0;
}
}
}
/* Receive starting jnl_seqno for (re)starting transmission */
int gtmsource_recv_restart(seq_num *recvd_jnl_seqno, int *msg_type, int *start_flags)
{
boolean_t msg_is_cross_endian;
fd_set input_fds;
repl_msg_t msg;
unsigned char *msg_ptr; /* needed for REPL_{SEND,RECV}_LOOP */
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
int torecv_len, recvd_len, recvd_this_iter; /* needed for REPL_RECV_LOOP */
int status; /* needed for REPL_{SEND,RECV}_LOOP */
unsigned char seq_num_str[32], *seq_num_ptr;
repl_msg_t xoff_ack;
# ifdef DEBUG
boolean_t remote_side_endianness_known;
# endif
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
status = SS_NORMAL;
assert(remote_side == &jnlpool.gtmsource_local->remote_side);
for ( ; SS_NORMAL == status; )
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Waiting for REPL_START_JNL_SEQNO or REPL_FETCH_RESYNC message\n");
REPL_RECV_LOOP(gtmsource_sock_fd, &msg, MIN_REPL_MSGLEN, FALSE, &gtmsource_poll_wait)
{
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return (SS_NORMAL);
}
DEBUG_ONLY(remote_side_endianness_known = remote_side->endianness_known);
if (SS_NORMAL == status)
{ /* If endianness of other side is not yet known, determine that now by seeing if the msg.len is
* greater than expected. If it is, convert it and see if it is now what we expect. If it is,
* then the other system is of opposite endianness. Note: We would normally use msg.type since
* it is effectively an enum and we control by adding new messages. But, REPL_START_JNL_SEQNO
* is lucky number zero which means it is identical on systems of either endianness.
*
* If endianness of other side is not yet known, determine that from the message length as we
* expect it to be MIN_REPL_MSGLEN. There is one exception though. If a pre-V55000 receiver sends
* a REPL_XOFF_ACK_ME message, it could send it in the receiver's native-endian or cross-endian
* format (depending on how its global variable "src_node_same_endianness" is initialized). This
* bug in the receiver server logic is fixed V55000 onwards (proto_ver is REPL_PROTO_VER_SUPPLEMENTARY).
* Therefore, in this case, we cannot use the endianness of the REPL_XOFF_ACK_ME message to determine
* the endianness of the connection. In this case, wait for the next non-REPL_XOFF_ACK_ME message
* to determine the connection endianness. Handle this exception case correctly.
*
* If on the other hand, we know the endianness of the other side, we still cannot guarantee which
* endianness a REPL_XOFF_ACK_ME message is sent in (in pre-V55000 versions for example in V53004A where
* it is sent in receiver native endian format whereas in V54002B it is sent in source native
* endian format). So better be safe on the source side and handle those cases like we did when
* we did not know the endianness of the remote side.
*
* The below check works as all messages we expect at this point have a fixed length of MIN_REPL_MSGLEN.
*/
msg_is_cross_endian = (((unsigned)MIN_REPL_MSGLEN < (unsigned)msg.len)
&& ((unsigned)MIN_REPL_MSGLEN == GTM_BYTESWAP_32((unsigned)msg.len)));
if (msg_is_cross_endian)
{
msg.type = GTM_BYTESWAP_32(msg.type);
msg.len = GTM_BYTESWAP_32(msg.len);
}
assert(msg.type == REPL_START_JNL_SEQNO || msg.type == REPL_FETCH_RESYNC || msg.type == REPL_XOFF_ACK_ME);
assert(MIN_REPL_MSGLEN == msg.len);
/* If we dont yet know the endianness of the other side and the input message is not a REPL_XOFF_ACK_ME
* we can decide the endianness of the receiver side by the endianness of the input message.
* REPL_XOFF_ACK_ME is an exception due to its handling by pre-V5500 versions (described in comments above).
*/
if (!remote_side->endianness_known && (REPL_XOFF_ACK_ME != msg.type))
{
remote_side->endianness_known = TRUE;
remote_side->cross_endian = msg_is_cross_endian;
if (remote_side->cross_endian)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source and Receiver sides have opposite "
"endianness\n");
else
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source and Receiver sides have same endianness\n");
}
/* We only expect REPL_START_JNL_SEQNO and REPL_XOFF_ACK_ME messages to be sent once the endianness of
* the remote side has been determined. We dont expect the REPL_FETCH_RESYNC message to be ever sent in
* the middle of a handshake (i.e. after the remote side endianness has been determined). Assert that.
* The logic that sets "msg_is_cross_endian" relies on this. If this assert fails, the logic has to change.
*/
assert((REPL_FETCH_RESYNC != msg.type) || !remote_side_endianness_known);
*msg_type = msg.type;
*start_flags = START_FLAG_NONE;
memcpy((uchar_ptr_t)recvd_jnl_seqno, (uchar_ptr_t)&msg.msg[0], SIZEOF(seq_num));
if (REPL_START_JNL_SEQNO == msg.type)
{
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received REPL_START_JNL_SEQNO message with SEQNO "
"%llu [0x%llx]\n", INT8_PRINT(*recvd_jnl_seqno), INT8_PRINT(*recvd_jnl_seqno));
if (msg_is_cross_endian)
((repl_start_msg_ptr_t)&msg)->start_flags =
GTM_BYTESWAP_32(((repl_start_msg_ptr_t)&msg)->start_flags);
*start_flags = ((repl_start_msg_ptr_t)&msg)->start_flags;
assert(!msg_is_cross_endian || (NODE_ENDIANNESS != ((repl_start_msg_ptr_t)&msg)->node_endianness));
if (*start_flags & START_FLAG_STOPSRCFILTER)
{
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Start JNL_SEQNO msg tagged with STOP SOURCE FILTER\n");
if (gtmsource_filter & EXTERNAL_FILTER)
{
repl_stop_filter();
gtmsource_filter &= ~EXTERNAL_FILTER;
} else
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Filter is not active, ignoring STOP SOURCE FILTER msg\n");
*msg_type = REPL_START_JNL_SEQNO;
}
/* Determine the protocol version of the receiver side. That information is encoded in the
* "proto_ver" field of the message from V51 onwards but to differentiate V50 vs V51 we need
* to check if the START_FLAG_VERSION_INFO bit is set in start_flags. If not the protocol is V50.
* V51 implies support for multi-site while V50 implies dual-site configuration only.
*/
if (*start_flags & START_FLAG_VERSION_INFO)
{
assert(REPL_PROTO_VER_DUALSITE != ((repl_start_msg_ptr_t)&msg)->proto_ver);
remote_side->is_supplementary = ((repl_start_msg_ptr_t)&msg)->is_supplementary;
remote_side->proto_ver = ((repl_start_msg_ptr_t)&msg)->proto_ver;
} else
{ /* Issue REPL2OLD error because receiver is dual-site */
rts_error(VARLSTCNT(6) ERR_REPL2OLD, 4, LEN_AND_STR(UNKNOWN_INSTNAME),
LEN_AND_STR(jnlpool.repl_inst_filehdr->inst_info.this_instname));
}
assert(*start_flags & START_FLAG_HASINFO); /* V4.2+ versions have jnl ver in the start msg */
remote_side->jnl_ver = ((repl_start_msg_ptr_t)&msg)->jnl_ver;
REPL_DPRINT3("Local jnl ver is octal %o, remote jnl ver is octal %o\n",
this_side->jnl_ver, remote_side->jnl_ver);
repl_check_jnlver_compat(!remote_side->cross_endian);
assert(remote_side->jnl_ver > V15_JNL_VER || 0 == (*start_flags & START_FLAG_COLL_M));
if (remote_side->jnl_ver <= V15_JNL_VER)
*start_flags &= ~START_FLAG_COLL_M; /* zap it for pro, just in case */
remote_side->is_std_null_coll = (*start_flags & START_FLAG_COLL_M) ? TRUE : FALSE;
assert((remote_side->jnl_ver >= V19_JNL_VER) || (0 == (*start_flags & START_FLAG_TRIGGER_SUPPORT)));
if (remote_side->jnl_ver < V19_JNL_VER)
*start_flags &= ~START_FLAG_TRIGGER_SUPPORT; /* zap it for pro, just in case */
remote_side->trigger_supported = (*start_flags & START_FLAG_TRIGGER_SUPPORT) ? TRUE : FALSE;
# ifdef GTM_TRIGGER
if (!remote_side->trigger_supported)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Warning : Secondary does not support GT.M "
"database triggers. #t updates on primary will not be replicated\n");
# endif
/* remote_side->null_subs_xform is initialized later in function "gtmsource_process" */
(TREF(replgbl)).trig_replic_warning_issued = FALSE;
return (SS_NORMAL);
} else if (REPL_FETCH_RESYNC == msg.type)
{ /* Determine the protocol version of the receiver side.
* Take care to set the flush parameter in repl_log calls below to FALSE until at least the
* first message gets sent back. This is so the fetchresync rollback on the other side does
* not timeout before receiving a response. */
remote_side->proto_ver = (RECAST(repl_resync_msg_ptr_t)&msg)->proto_ver;
remote_side->is_supplementary = (RECAST(repl_resync_msg_ptr_t)&msg)->is_supplementary;
/* The following fields dont need to be initialized since they are needed (for internal filter
* transformations) only if we send journal records across. REPL_FETCH_RESYNC causes only
* protocol messages to be exchanged (no journal records).
* remote_side->jnl_ver = ...
* remote_side->is_std_null_coll = ...
* remote_side->trigger_supported = ...
* remote_side->null_subs_xform = ...
*/
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_FETCH_RESYNC message with SEQNO "
"%llu [0x%llx]\n", INT8_PRINT(*recvd_jnl_seqno), INT8_PRINT(*recvd_jnl_seqno));
return (SS_NORMAL);
} else if (REPL_XOFF_ACK_ME == msg.type)
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_XOFF_ACK_ME message. "
"Possible crash/shutdown of update process\n");
/* Send XOFF_ACK */
xoff_ack.type = REPL_XOFF_ACK;
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
memcpy((uchar_ptr_t)&xoff_ack.msg[0], (uchar_ptr_t)recvd_jnl_seqno, SIZEOF(seq_num));
xoff_ack.len = MIN_REPL_MSGLEN;
repl_log(gtmsource_log_fp, TRUE, TRUE, "Sending REPL_XOFF_ACK message\n");
REPL_SEND_LOOP(gtmsource_sock_fd, &xoff_ack, xoff_ack.len, FALSE, &gtmsource_poll_immediate)
{
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return (SS_NORMAL);
}
} else
{ /* If unknown message is received, close connection. Caller will reopen the same. */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received UNKNOWN message (type = %d). "
"Closing connection.\n", msg.type);
assert(FALSE);
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return (SS_NORMAL);
}
}
}
return (status);
}
int gtmsource_srch_restart(seq_num recvd_jnl_seqno, int recvd_start_flags)
{
seq_num cur_read_jnl_seqno;
qw_off_t cur_read_addr;
uint4 cur_read, prev_read, prev_tr_size, jnlpool_size;
int save_lastwrite_len;
unsigned char seq_num_str[32], *seq_num_ptr;
jnlpool_ctl_ptr_t jctl;
gtmsource_local_ptr_t gtmsource_local;
gd_region *reg, *region_top;
sgmnt_addrs *csa, *repl_csa;
jctl = jnlpool.jnlpool_ctl;
jnlpool_size = jctl->jnlpool_size;
gtmsource_local = jnlpool.gtmsource_local;
assert(recvd_jnl_seqno <= jctl->jnl_seqno);
cur_read_jnl_seqno = gtmsource_local->read_jnl_seqno;
if (recvd_jnl_seqno > cur_read_jnl_seqno)
{ /* The secondary is requesting a seqno higher than what we last remember having sent but yet it is in sync with us
* upto seqno "recvd_jnl_seqno" as otherwise the caller would have determined it is out of sync and not even call
* us. To illustrate an example of how this could happen, consider an INSTA->INSTB replication and INSTA->INSTB
* replication going on. Lets say INSTA's journal sequence number is at 100. INSTB is at 60 and INSTB is at 30.
* This means, last sequence number sent by INSTA to INSTB is 60 and to INSTB is 30. Now, lets say INSTA is shutdown
* and INSTB comes up as the primary to INSTC and starts replicating the 30 updates thereby bringing both INSTB and
* INSTC sequence number to 60. Now, if INSTA comes backup again as primary against INSTC, we will have a case where
* gtmsource_local->read_jnl_seqno as 30, but recvd_jnl_seqno as 60. This means that we are going to bump
* "gtmsource_local->read_jnl_seqno" up to the received seqno (in the later call to "gtmsource_flush_fh") without
* knowing how many bytes of transaction data that meant (to correspondingly bump up "gtmsource_local->read_addr").
* The only case where we dont care to maintain "read_addr" is if we are in READ_FILE mode AND the current
* read_jnl_seqno and the received seqno is both lesser than or equal to "gtmsource_save_read_jnl_seqno". Except
* for that case, we need to reset "gtmsource_save_read_jnl_seqno" to correspond to the current jnl seqno.
*/
if ((READ_FILE != gtmsource_local->read_state) || (recvd_jnl_seqno > gtmsource_save_read_jnl_seqno))
{
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, ASSERT_NO_ONLINE_ROLLBACK);
gtmsource_local->read_state = READ_FILE;
gtmsource_save_read_jnl_seqno = jctl->jnl_seqno;
gtmsource_local->read_addr = jnlpool.jnlpool_ctl->write_addr;
gtmsource_local->read = jnlpool.jnlpool_ctl->write;
rel_lock(jnlpool.jnlpool_dummy_reg);
}
} else if (READ_POOL == gtmsource_local->read_state)
{ /* Follow the back-chain in the Journal Pool to find whether or not recvd_jnl_seqno is in the Pool */
/* The implementation for searching through the back chain has several inefficiences. We are deferring addressing
* them to keep code changes for V4.4-003 to a minimum. We should address these in an upcoming release.
* Vinaya 2003, Oct 02 */
QWASSIGN(cur_read_addr, gtmsource_local->read_addr);
cur_read = gtmsource_local->read;
if (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWGT(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGT(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
if (QWGE(jctl->early_write_addr, cur_read_addr))
{ /* If there is no more input to be read, the previous transaction size should not be read from the
* journal pool since the read pointers point to the next read. In such a case, we can find the
* size of the transcation cur_read_jnl_seqno from jctl->lastwrite_len. We should access
* lastwrite_len after a memory barrier to avoid reading a stale value. We rely on the memory
* barrier done in jnlpool_hasnt_overflowed */
save_lastwrite_len = jctl->lastwrite_len;
if (QWEQ(jctl->early_write_addr, cur_read_addr))
{ /* GT.M is not writing any transaction, safe to rely on jctl->lastwrite_len. Note,
* GT.M could not have been writing transaction cur_read_jnl_seqno if we are here. Also,
* lastwrite_len cannot be in the future w.r.t early_write_addr because of the memory
* barriers we do in t{p}_{t}end.c. It can be behind by atmost one transaction
* (cur_read_jnl_seqno). Well, we want the length of transaction cur_read_jnl_seqno,
* not cur_read_jnl_seqno + 1.
*/
QWDECRBYDW(cur_read_addr, save_lastwrite_len);
QWDECRBYDW(cur_read_jnl_seqno, 1);
prev_read = cur_read;
if (cur_read > save_lastwrite_len)
cur_read -= save_lastwrite_len;
else
{
cur_read = cur_read - (save_lastwrite_len % jnlpool_size);
if (cur_read >= prev_read)
cur_read += jnlpool_size;
}
assert(cur_read == QWMODDW(cur_read_addr, jnlpool_size));
REPL_DPRINT2("Srch restart : No more input in jnlpool, backing off to read_jnl_seqno : "
INT8_FMT, INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %d\n", INT8_PRINT(cur_read_addr), cur_read);
}
}
if (QWEQ(cur_read_addr, jctl->write_addr))
{ /* we caught a GTM process writing cur_read_jnl_seqno + 1, we cannot rely on lastwrite_len as it
* may or may not have changed. Wait until the GTM process finishes writing this transaction */
repl_log(gtmsource_log_fp, TRUE, FALSE, "SEARCHING RESYNC POINT IN POOL : Waiting for GTM process "
"to finish writing journal records to the pool\n");
while (QWEQ(cur_read_addr, jctl->write_addr))
{
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_JNL_RECS);
gtmsource_poll_actions(FALSE);
}
repl_log(gtmsource_log_fp, TRUE, FALSE, "SEARCHING RESYNC POINT IN POOL : GTM process finished "
"writing journal records to the pool\n");
}
}
while (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWGT(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGT(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
assert(cur_read + SIZEOF(jnldata_hdr_struct) <= jnlpool_size);
prev_tr_size = ((jnldata_hdr_ptr_t)(jnlpool.jnldata_base + cur_read))->prev_jnldata_len;
if (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr))
{
QWDECRBYDW(cur_read_addr, prev_tr_size);
prev_read = cur_read;
cur_read -= prev_tr_size;
if (cur_read >= prev_read)
cur_read += jnlpool_size;
assert(cur_read == QWMODDW(cur_read_addr, jnlpool_size));
QWDECRBYDW(cur_read_jnl_seqno, 1);
REPL_DPRINT2("Srch restart : No overflow yet, backing off to read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %d\n", INT8_PRINT(cur_read_addr), cur_read);
continue;
}
break;
}
QWASSIGN(gtmsource_local->read_addr, cur_read_addr);
gtmsource_local->read = cur_read;
if (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWEQ(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGE(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
REPL_DPRINT2("Srch restart : Now in READ_POOL state read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %d\n",INT8_PRINT(cur_read_addr), cur_read);
} else
{
/* Overflow, or requested seqno too far back to be in pool */
REPL_DPRINT2("Srch restart : Now in READ_FILE state. Changing sync point to read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %d ", INT8_PRINT(cur_read_addr), cur_read);
REPL_DPRINT2("save_read_jnl_seqno : "INT8_FMT"\n", INT8_PRINT(gtmsource_save_read_jnl_seqno));
QWASSIGN(gtmsource_save_read_jnl_seqno, cur_read_jnl_seqno);
if (QWLT(gtmsource_save_read_jnl_seqno, jctl->start_jnl_seqno))
{
QWASSIGN(gtmsource_save_read_jnl_seqno, jctl->start_jnl_seqno);
assert(QWEQ(gtmsource_local->read_addr, seq_num_zero));
assert(gtmsource_local->read == 0);
/* For pro version, force zero assignment */
QWASSIGN(gtmsource_local->read_addr, seq_num_zero);
gtmsource_local->read = 0;
REPL_DPRINT2("Srch restart : Sync point "INT8_FMT, INT8_PRINT(gtmsource_save_read_jnl_seqno));
REPL_DPRINT2(" beyond start_seqno : "INT8_FMT, INT8_PRINT(jctl->start_jnl_seqno));
REPL_DPRINT3(", sync point set to read_addr : "INT8_FMT" read : %d\n",
INT8_PRINT(gtmsource_local->read_addr), gtmsource_local->read);
}
gtmsource_local->read_state = READ_FILE;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server now reading from journal files; journal pool "
"does not contain transaction %llu [0x%llx]\n", recvd_jnl_seqno, recvd_jnl_seqno);
gtmsource_pool2file_transition = TRUE;
}
} else /* read_state is READ_FILE and requesting a sequence number less than or equal to read_jnl_seqno */
{
assert(cur_read_jnl_seqno == gtmsource_local->read_jnl_seqno);
if (cur_read_jnl_seqno > gtmsource_save_read_jnl_seqno)
gtmsource_save_read_jnl_seqno = cur_read_jnl_seqno;
REPL_DPRINT2("Srch restart : Continuing in READ_FILE state. Retaining sync point for read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT2(" at read_addr : "INT8_FMT, INT8_PRINT(gtmsource_local->read_addr));
REPL_DPRINT3(" read : %d corresponding to save_read_jnl_seqno : "INT8_FMT"\n", gtmsource_local->read,
INT8_PRINT(gtmsource_save_read_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server continuing to read from journal files at seqno "
"%llu [0x%llx]\n", recvd_jnl_seqno, recvd_jnl_seqno);
}
REPL_DPRINT2("Setting resync_seqno to "INT8_FMT"\n", INT8_PRINT(recvd_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server last sent seqno %llu [0x%llx]\n",
cur_read_jnl_seqno, cur_read_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server will start sending from seqno %llu [0x%llx]\n",
recvd_jnl_seqno, recvd_jnl_seqno);
if (READ_POOL == gtmsource_local->read_state)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from the journal POOL\n");
else
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from the journal FILES\n");
/* Finally set "gtmsource_local->read_jnl_seqno" to be "recvd_jnl_seqno" and flush changes to instance file on disk */
gtmsource_flush_fh(recvd_jnl_seqno);
assert(GTMSOURCE_HANDLE_ONLN_RLBK != gtmsource_state);
return (SS_NORMAL);
}
int gtmsource_get_jnlrecs(uchar_ptr_t buff, int *data_len, int maxbufflen, boolean_t read_multiple)
{
int total_tr_len;
unsigned char seq_num_str[32], *seq_num_ptr;
jnlpool_ctl_ptr_t jctl;
gtmsource_local_ptr_t gtmsource_local;
seq_num jnl_seqno, read_jnl_seqno;
qw_num write_addr, read_addr;
jctl = jnlpool.jnlpool_ctl;
gtmsource_local = jnlpool.gtmsource_local;
write_addr = jctl->write_addr;
jnl_seqno = jctl->jnl_seqno;
read_jnl_seqno = gtmsource_local->read_jnl_seqno;
read_addr = gtmsource_local->read_addr;
assert(read_addr <= write_addr);
assert((0 != write_addr) || (read_jnl_seqno <= jctl->start_jnl_seqno));
#ifdef GTMSOURCE_ALWAYS_READ_FILES
gtmsource_local->read_state = READ_FILE;
#endif
switch(gtmsource_local->read_state)
{
case READ_POOL:
#ifndef GTMSOURCE_ALWAYS_READ_FILES_STRESS
if (read_addr == write_addr)
{ /* Nothing to read. While reading pool, the comparison of read_addr against write_addr is
* the only reliable indicator if there are any transactions to be read. This is due to
* the placement of memory barriers in t_end/tp_tend.c. Also, since we do not issue memory
* barrier here, we may be reading a stale value of write_addr in which case we may conclude
* that there is nothing to read. But, this will not continue forever as the source server
* eventually (decided by architecture's implementation) will see the change to write_addr.
*/
*data_len = 0;
return (0);
}
total_tr_len = gtmsource_readpool(buff, data_len, maxbufflen, read_multiple, write_addr);
if (GTMSOURCE_SEND_NEW_HISTINFO == gtmsource_state)
return (0); /* need to send REPL_HISTREC message before sending any more seqnos */
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
return (0); /* Connection got reset in call to "gtmsource_readpool" */
if (0 < total_tr_len)
return (total_tr_len);
if (0 < *data_len)
return (-1);
#endif /* for GTMSOURCE_ALWAYS_READ_FILES_STRESS, we let the source server switch back and forth between pool read and file read */
/* Overflow, switch to READ_FILE */
gtmsource_local->read_state = READ_FILE;
QWASSIGN(gtmsource_save_read_jnl_seqno, read_jnl_seqno);
gtmsource_pool2file_transition = TRUE; /* so that we read the latest gener jnl files */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from journal files; journal pool "
"overflow detected at seqno %llu [0x%llx]\n",
gtmsource_save_read_jnl_seqno, gtmsource_save_read_jnl_seqno);
/* CAUTION : FALL THROUGH */
case READ_FILE:
if (read_jnl_seqno >= jnl_seqno)
{ /* Nothing to read. While reading from files, source server does not use write_addr to decide
* how much to read. Also, it is possible that read_addr and write_addr are the same if the
* source server came up after a crash and syncs with the latest state in jnlpool (see
* gtmsource()). These reasons preclude the comparison of read_addr and write_addr (as we did for
* READ_POOL case) to decide whether there are any unsent transactions. We use jnl_seqno instead.
* Note though, the source server's view of jnl_seqno may be stale, and we may conclude that
* we don't have anything to read as we do not do memory barrier here to fetch the latest
* value of jnl_seqno. But, this will not continue forever as the source server eventually
* (decided by architecture's implementation) will see the change to jnl_seqno.
*
* On systems that allow unordered memory access, it is possible that the value of jnl_seqno
* as seen by source server is in the past compared to read_jnl_seqno - source server in
* keeping up with updaters reads (from pool) and sends upto write_addr, the last transaction
* of which could be jnl_seqno + 1. To cover the case, we use >= in the above comparison.
* Given this, we may return with "nothing to read" even though we fell through from the
* READ_POOL case.
*/
*data_len = 0;
return 0;
}
if (gtmsource_pool2file_transition /* read_pool -> read_file transition */
|| NULL == repl_ctl_list) /* files not opened */
{
/* Close all the file read related structures and start afresh. The idea here is that most of the
* file read info might be stale 'cos there is usually a long gap between pool to file read
* transitions (in production environments). So, start afresh with the latest generation journal
* files. This will also prevent opening previous generations that may not be required.
*/
REPL_DPRINT1("Pool to File read transition. Closing all the stale file read info and starting "
"afresh\n");
gtmsource_ctl_close();
gtmsource_ctl_init();
gtmsource_pool2file_transition = FALSE;
}
total_tr_len = gtmsource_readfiles(buff, data_len, maxbufflen, read_multiple);
if (GTMSOURCE_SEND_NEW_HISTINFO == gtmsource_state)
return (0); /* need to send REPL_HISTREC message before sending any more seqnos */
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
return (0); /* Connection got reset in call to "gtmsource_readfiles" */
if (0 < total_tr_len)
return (total_tr_len);
if (0 < *data_len)
return (-1);
GTMASSERT;
}
return (-1); /* This should never get executed, added to make compiler happy */
}
/* This function can be used to only send fixed-size message types across the replication pipe.
* This in turn uses REPL_SEND* macros but also does error checks and sets the global variable "gtmsource_state" accordingly.
*
* msg = Pointer to the message buffer to send
* msgtypestr = Message name as a string to display meaningful error messages
* optional_seqno = Optional seqno that needs to be printed along with the message name
*/
void gtmsource_repl_send(repl_msg_ptr_t msg, char *msgtypestr, seq_num optional_seqno, int4 optional_strm_num)
{
unsigned char *msg_ptr; /* needed for REPL_SEND_LOOP */
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
int status; /* needed for REPL_SEND_LOOP */
char err_string[1024];
assert((REPL_MULTISITE_MSG_START > msg->type) || (REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver));
if (MAX_SEQNO != optional_seqno)
{
if (INVALID_SUPPL_STRM == optional_strm_num)
repl_log(gtmsource_log_fp, TRUE, FALSE, "Sending %s message with seqno %llu [0x%llx]\n", msgtypestr,
optional_seqno, optional_seqno);
else
repl_log(gtmsource_log_fp, TRUE, FALSE, "Sending %s message with seqno %llu [0x%llx] for Stream # %2d\n",
msgtypestr, optional_seqno, optional_seqno, optional_strm_num);
} else
repl_log(gtmsource_log_fp, TRUE, FALSE, "Sending %s message\n", msgtypestr);
REPL_SEND_LOOP(gtmsource_sock_fd, msg, msg->len, FALSE, &gtmsource_poll_immediate)
{
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return;
}
/* Check for error status from the REPL_SEND */
if (SS_NORMAL != status)
{
if (REPL_CONN_RESET(status) && EREPL_SEND == repl_errno)
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while sending %s. Status = %d ; %s\n",
msgtypestr, status, STRERROR(status));
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return;
}
if (EREPL_SEND == repl_errno)
{
SNPRINTF(err_string, SIZEOF(err_string), "Error sending %s message. "
"Error in send : %s", msgtypestr, STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_STRING(err_string));
}
if (EREPL_SELECT == repl_errno)
{
SNPRINTF(err_string, SIZEOF(err_string), "Error sending %s message. "
"Error in select : %s", msgtypestr, STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_STRING(err_string));
}
}
}
/* This function can be used to only receive fixed-size message types across the replication pipe.
* This in turn uses REPL_RECV* macros but also does error checks and sets the global variable "gtmsource_state" accordingly.
* While waiting for the input message type, this function also recognizes
* a) a REPL_XOFF_ACK_ME message in which case invokes "gtmsource_repl_send" to send a REPL_XOFF_ACK message type.
* b) a REPL_XOFF or REPL_XON message in which case ignores them as we are still in the initial handshake stage
* and these messages from the receiver are not relevant.
*
* msg = Pointer to the message buffer where received message will be stored
* msglen = Length of the message to receive
* msgtype = Expected type of the message (if received message is not of this type, the connection is closed)
* msgtypestr = Message name as a string to display meaningful error messages
*/
static boolean_t gtmsource_repl_recv(repl_msg_ptr_t msg, int4 msglen, int4 msgtype, char *msgtypestr)
{
unsigned char *msg_ptr; /* needed for REPL_RECV_LOOP */
int torecv_len, recvd_len, recvd_this_iter; /* needed for REPL_RECV_LOOP */
int status; /* needed for REPL_RECV_LOOP */
repl_msg_t xoff_ack;
char err_string[1024];
unsigned char *buff;
int4 bufflen;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Waiting for %s message\n", msgtypestr);
assert((REPL_XOFF != msgtype) && (REPL_XON != msgtype) && (REPL_XOFF_ACK_ME != msgtype));
buff = (unsigned char *)msg;
bufflen = msglen;
do
{ /* Note that "bufflen" could potentially be > 32-byte (MIN_REPL_MSGLEN) the length of most replication
* messages, in case we are expecting to receive a REPL_CMP_SOLVE message. In that case, it is possible
* that while waiting for the 512 byte or so REPL_CMP_SOLVE message, we get a 32-byte REPL_XOFF_ACK_ME
* message. In this case, we should break out of the REPL_RECV_LOOP as otherwise we would be eternally
* waiting for a never-to-come REPL_CMP_SOLVE message. This is in fact a general issue with any REPL_RECV_LOOP
* code that passes a 3rd parameter > MIN_REPL_MSGLEN. All such usages need a check inside the body of the
* loop to account for a REPL_XOFF_ACK_ME and if so break.
*/
REPL_RECV_LOOP(gtmsource_sock_fd, buff, bufflen, FALSE, &gtmsource_poll_wait)
{
gtmsource_poll_actions(TRUE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE;
/* Check if we received an XOFF_ACK_ME message completely. If yes, we can safely break out of the
* loop without receiving the originally intended message (as the receiver is going to drain away all
* the stuff in the replication pipe anyways and reinitiate a fresh handshake). This way we dont hang
* eternally waiting for a never-arriving originally intended message.
*/
if ((MIN_REPL_MSGLEN <= recvd_len) && (REPL_XOFF_ACK_ME == msg->type))
break;
}
if (SS_NORMAL != status)
{
if (EREPL_RECV == repl_errno)
{
if (REPL_CONN_RESET(status))
{ /* Connection reset */
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Connection reset while attempting to receive %s message. Status = %d ; %s\n",
msgtypestr, status, STRERROR(status));
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state =
GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
} else
{
SNPRINTF(err_string, SIZEOF(err_string),
"Error receiving %s message from Receiver. Error in recv : %s",
msgtypestr, STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_STRING(err_string));
}
} else if (EREPL_SELECT == repl_errno)
{
SNPRINTF(err_string, SIZEOF(err_string),
"Error receiving %s message from Receiver. Error in select : %s",
msgtypestr, STRERROR(status));
rts_error(VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_STRING(err_string));
}
}
assert(SS_NORMAL == status);
if ((REPL_XON != msg->type) && (REPL_XOFF != msg->type))
break;
/* Strip the "msg" buffer of the XON or XOFF message and redo the loop until "msglen" bytes of the expected
* msgtype are obtained. If "msglen" matches the XON or XOFF message length, then we can discard the ENTIRE message
* and redo the loop without having to strip a "partial" msg buffer. If "msglen" is LESSER than a complete XON or
* XOFF message, then we have to finish reading the entire XON/XOFF message and then redo the loop. But we
* expect all callers to set "msglen" to at least the XON/XOFF message length. Assert accordingly.
* Note that the below logic works even if more than one XON/XOFF messages get sent before the expected message.
* For every XON/XOFF message, we will do one memmove and go back in the loop to read MIN_REPL_MSGLEN-more bytes.
*/
assert(MIN_REPL_MSGLEN == msg->len);
assert(MIN_REPL_MSGLEN <= msglen);
bufflen = msg->len;
if (msglen != bufflen)
{
memmove(msg, (uchar_ptr_t)msg + bufflen, msglen - bufflen);
buff = (uchar_ptr_t)msg + msglen - bufflen;
}
} while (TRUE);
/* Check if message received is indeed of expected type */
assert(remote_side->endianness_known); /* so we ensure msg->type we read below is accurate and not cross-endian format */
if (REPL_XOFF_ACK_ME == msg->type)
{ /* Send XOFF_ACK. Anything sent before this in the replication pipe will be drained and therefore
* return to the caller so it can reissue the message exchange sequence right from the beginning.
*/
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_XOFF_ACK_ME message\n", msgtypestr);
xoff_ack.type = REPL_XOFF_ACK;
memcpy((uchar_ptr_t)&xoff_ack.msg[0], (uchar_ptr_t)&gtmsource_msgp->msg[0], SIZEOF(seq_num));
xoff_ack.len = MIN_REPL_MSGLEN;
gtmsource_repl_send((repl_msg_ptr_t)&xoff_ack, "REPL_XOFF_ACK", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* "gtmsource_repl_send" did not complete */
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_RESTART;
return FALSE;
} else if (msgtype != msg->type)
{
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "UNKNOWN msg (type = %d) received when waiting for msg (type = %d)"
". Closing connection.\n", msg->type, msgtype);
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
} else
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received %s message\n", msgtypestr);
return TRUE;
}
}
/* Given that the source server was started with compression enabled, this function checks if the receiver server was
* also started with the decompression enabled and if so sends a compressed test message. The receiver server responds back
* whether it is successfully able to decompress that or not. If yes, compression is enabled on the replication pipe and
* the input parameter "*repl_zlib_cmp_level_ptr" is set to the compression level used.
*/
boolean_t gtmsource_get_cmp_info(int4 *repl_zlib_cmp_level_ptr)
{
repl_cmpinfo_msg_t test_msg, solve_msg;
char inputdata[REPL_MSG_CMPDATALEN], cmpbuf[REPL_MSG_CMPEXPDATALEN];
int index, cmpret, start;
boolean_t cmpfail;
uLongf cmplen;
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
assert(gtm_zlib_cmp_level);
/*************** Send REPL_CMP_TEST message ***************/
memset(&test_msg, 0, SIZEOF(test_msg));
test_msg.type = REPL_CMP_TEST;
test_msg.len = REPL_MSG_CMPINFOLEN;
test_msg.proto_ver = REPL_PROTO_VER_THIS;
/* Fill in test data with random data. The data will be a sequence of bytes from 0 to 255. The start point though
* is randomly chosen using the process_id. If it is 253, the resulting sequence would be 253, 254, 255, 0, 1, 2, ...
*/
for (start = (process_id & REPL_MSG_CMPDATAMASK), index = 0; index < REPL_MSG_CMPDATALEN; index++)
inputdata[index] = (start + index) % REPL_MSG_CMPDATALEN;
/* Compress the data */
cmplen = SIZEOF(cmpbuf); /* initialize it to the available compressed buffer space */
ZLIB_COMPRESS(&cmpbuf[0], cmplen, inputdata, REPL_MSG_CMPDATALEN, gtm_zlib_cmp_level, cmpret);
switch(cmpret)
{
case Z_MEM_ERROR:
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE,
"Out-of-memory; Error from zlib compress function before sending REPL_CMP_TEST message\n");
break;
case Z_BUF_ERROR:
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Insufficient output buffer; Error from zlib compress "
"function before sending REPL_CMP_TEST message\n");
break;
case Z_STREAM_ERROR:
/* level was incorrectly specified. Default to NO compression. */
repl_log(gtmsource_log_fp, TRUE, FALSE, "Compression level %d invalid; Error from compress function"
" before sending REPL_CMP_TEST message\n", gtm_zlib_cmp_level);
break;
}
if (Z_OK != cmpret)
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE; /* no compression */
return TRUE;
}
if (REPL_MSG_CMPEXPDATALEN < cmplen)
{ /* The zlib compression library expanded data more than we had allocated for. But handle code for pro */
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Compression of %d bytes of test data resulted in %d bytes which is"
" more than allocated space of %d bytes\n", REPL_MSG_CMPDATALEN, cmplen, REPL_MSG_CMPEXPDATALEN);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE; /* no compression */
return TRUE;
}
test_msg.datalen = (int4)cmplen;
memcpy(test_msg.data, cmpbuf, test_msg.datalen);
gtmsource_repl_send((repl_msg_ptr_t)&test_msg, "REPL_CMP_TEST", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/* for in-house testing, set up a timer that would cause assert failure if REPL_CMP_SOLVE is not received
* within 1 or 15 minutes, depending on whether this is a white-box test or not */
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
start_timer((TID)repl_cmp_solve_src_timeout, 15 * 60 * 1000, repl_cmp_solve_src_timeout, 0, NULL);
# endif
/*************** Receive REPL_CMP_SOLVE message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&solve_msg, REPL_MSG_CMPINFOLEN, REPL_CMP_SOLVE, "REPL_CMP_SOLVE"))
{
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
cancel_timer((TID)repl_cmp_solve_src_timeout);
# endif
return FALSE; /* recv did not succeed */
}
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
cancel_timer((TID)repl_cmp_solve_src_timeout);
# endif
assert(REPL_CMP_SOLVE == solve_msg.type);
cmpfail = FALSE;
if (REPL_MSG_CMPDATALEN != solve_msg.datalen)
{
assert(REPL_RCVR_CMP_TEST_FAIL == solve_msg.datalen);
cmpfail = TRUE;
} else
{ /* Check that receiver side decompression was correct */
for (index = 0; index < REPL_MSG_CMPDATALEN; index++)
{
if (inputdata[index] != solve_msg.data[index])
{
cmpfail = FALSE;
break;
}
}
}
if (!cmpfail)
{
assert(solve_msg.proto_ver == remote_side->proto_ver);
assert(REPL_PROTO_VER_MULTISITE_CMP <= solve_msg.proto_ver);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Receiver server was able to decompress successfully\n");
*repl_zlib_cmp_level_ptr = gtm_zlib_cmp_level;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Using zlib compression level %d for replication\n", gtm_zlib_cmp_level);
} else
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Receiver server could not decompress successfully\n");
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE;
}
return TRUE;
}
void repl_cmp_solve_src_timeout(void)
{
GTMASSERT;
}
/* Note: Do NOT reset input parameter "strm_jnl_seqno" unless receiver instance is supplementary and root primary */
boolean_t gtmsource_get_instance_info(boolean_t *secondary_was_rootprimary, seq_num *strm_jnl_seqno)
{
char print_msg[1024];
gtmsource_local_ptr_t gtmsource_local;
int status;
repl_instinfo_msg_t instinfo_msg;
repl_needinst_msg_t needinst_msg;
repl_old_instinfo_msg_t old_instinfo_msg;
repl_old_needinst_msg_t old_needinst_msg;
gtmsource_local = jnlpool.gtmsource_local;
assert(NULL != jnlpool.repl_inst_filehdr); /* journal pool should be set up */
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Use pre-supplementary protocol to communicate */
/*************** Send REPL_OLD_NEED_INSTANCE_INFO message ***************/
memset(&old_needinst_msg, 0, SIZEOF(old_needinst_msg));
old_needinst_msg.type = REPL_OLD_NEED_INSTANCE_INFO;
old_needinst_msg.len = MIN_REPL_MSGLEN;
memcpy(old_needinst_msg.instname, jnlpool.repl_inst_filehdr->inst_info.this_instname, MAX_INSTNAME_LEN - 1);
old_needinst_msg.proto_ver = REPL_PROTO_VER_THIS;
old_needinst_msg.node_endianness = NODE_ENDIANNESS;
old_needinst_msg.is_rootprimary = !(jnlpool.jnlpool_ctl->upd_disabled);
gtmsource_repl_send((repl_msg_ptr_t)&old_needinst_msg, "REPL_OLD_NEED_INSTANCE_INFO",
MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_OLD_INSTANCE_INFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&old_instinfo_msg, MIN_REPL_MSGLEN,
REPL_OLD_INSTANCE_INFO, "REPL_OLD_INSTANCE_INFO"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_INSTANCE_INFO == old_instinfo_msg.type);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received secondary instance name is [%s]\n", old_instinfo_msg.instname);
if (jnlpool.repl_inst_filehdr->is_supplementary)
{ /* Issue REPL2OLD error because this is a supplementary instance and remote side runs
* on a GT.M version that does not understand the supplementary protocol */
rts_error(VARLSTCNT(6) ERR_REPL2OLD, 4, LEN_AND_STR(old_instinfo_msg.instname),
LEN_AND_STR(jnlpool.repl_inst_filehdr->inst_info.this_instname));
}
/* Check if instance name in the REPL_OLD_INSTANCE_INFO message matches that in the source server command line */
if (STRCMP(old_instinfo_msg.instname, jnlpool.gtmsource_local->secondary_instname))
{ /* Instance name obtained from the receiver does not match what was specified in the
* source server command line. Issue error.
*/
sgtm_putmsg(print_msg, VARLSTCNT(6) ERR_REPLINSTSECMTCH, 4,
LEN_AND_STR(old_instinfo_msg.instname), LEN_AND_STR(jnlpool.gtmsource_local->secondary_instname));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
status = gtmsource_shutdown(TRUE, NORMAL_SHUTDOWN) - NORMAL_SHUTDOWN;
gtmsource_exit(status);
}
*secondary_was_rootprimary = (boolean_t)old_instinfo_msg.was_rootprimary;
} else
{ /* Use supplementary protocol to communicate */
/*************** Send REPL_NEED_INSTINFO message ***************/
memset(&needinst_msg, 0, SIZEOF(needinst_msg));
needinst_msg.type = REPL_NEED_INSTINFO;
needinst_msg.len = SIZEOF(needinst_msg);
memcpy(needinst_msg.instname, jnlpool.repl_inst_filehdr->inst_info.this_instname, MAX_INSTNAME_LEN - 1);
needinst_msg.lms_group_info = jnlpool.repl_inst_filehdr->lms_group_info;
/* Need to byteswap a few multi-byte fields to take into account the receiver endianness */
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
if (remote_side->cross_endian && (this_side->jnl_ver < remote_side->jnl_ver))
ENDIAN_CONVERT_REPL_INST_UUID(&needinst_msg.lms_group_info);
needinst_msg.proto_ver = REPL_PROTO_VER_THIS;
needinst_msg.is_rootprimary = !(jnlpool.jnlpool_ctl->upd_disabled);
needinst_msg.is_supplementary = jnlpool.repl_inst_filehdr->is_supplementary;
gtmsource_repl_send((repl_msg_ptr_t)&needinst_msg, "REPL_NEED_INSTINFO", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_INSTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&instinfo_msg, SIZEOF(repl_instinfo_msg_t),
REPL_INSTINFO, "REPL_INSTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_INSTINFO == instinfo_msg.type);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received secondary instance name is [%s]\n", instinfo_msg.instname);
if (!remote_side->is_supplementary)
{ /* Remote side is non-supplementary */
if (jnlpool.repl_inst_filehdr->is_supplementary)
{ /* Issue SECNOTSUPPLEMENTARY error because this is a supplementary primary and secondary
* is not a supplementary instance.
*/
rts_error(VARLSTCNT(6) ERR_SECNOTSUPPLEMENTARY, 4,
LEN_AND_STR(jnlpool.repl_inst_filehdr->inst_info.this_instname),
LEN_AND_STR(instinfo_msg.instname));
}
} else if (!jnlpool.repl_inst_filehdr->is_supplementary)
{ /* Remote side is supplementary and Local side is non-supplementary.
* The REPL_INSTINFO message would have a non-zero "strm_jnl_seqno" field.
* Pass it back on to the caller.
*/
assert(instinfo_msg.strm_jnl_seqno);
assert(0 == GET_STRM_INDEX(instinfo_msg.strm_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received Stream Seqno = %llu [0x%llx]\n",
instinfo_msg.strm_jnl_seqno, instinfo_msg.strm_jnl_seqno);
*strm_jnl_seqno = instinfo_msg.strm_jnl_seqno;
}
/* Check if instance name in the REPL_INSTINFO message matches that in the source server command line */
if (STRCMP(instinfo_msg.instname, jnlpool.gtmsource_local->secondary_instname))
{ /* Instance name obtained from the receiver does not match what was specified in the
* source server command line. Issue error.
*/
sgtm_putmsg(print_msg, VARLSTCNT(6) ERR_REPLINSTSECMTCH, 4,
LEN_AND_STR(instinfo_msg.instname), LEN_AND_STR(jnlpool.gtmsource_local->secondary_instname));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
status = gtmsource_shutdown(TRUE, NORMAL_SHUTDOWN) - NORMAL_SHUTDOWN;
gtmsource_exit(status);
}
*secondary_was_rootprimary = (boolean_t)instinfo_msg.was_rootprimary;
}
return TRUE;
}
/* Given an input "seqno", this function locates the histinfo record from the receiver that corresponds to "seqno - 1"
*
* seqno --> The journal seqno that is to be searched in the instance file history.
* histinfo --> Pointer to the histinfo structure that is filled in with what was received.
*/
boolean_t gtmsource_get_remote_histinfo(seq_num seqno, repl_histinfo *histinfo)
{
char err_string[1024];
repl_histinfo1_msg_t histinfo1_msg;
repl_histinfo2_msg_t histinfo2_msg;
repl_histinfo_msg_t histinfo_msg;
repl_needhistinfo_msg_t needhistinfo_msg;
/*************** Send REPL_NEED_HISTINFO (formerly REPL_NEED_TRIPLE_INFO) message ***************/
memset(&needhistinfo_msg, 0, SIZEOF(needhistinfo_msg));
assert(SIZEOF(needhistinfo_msg) == MIN_REPL_MSGLEN);
needhistinfo_msg.type = REPL_NEED_HISTINFO;
needhistinfo_msg.len = MIN_REPL_MSGLEN;
needhistinfo_msg.seqno = seqno;
needhistinfo_msg.strm_num = strm_index;
needhistinfo_msg.histinfo_num = INVALID_HISTINFO_NUM;
gtmsource_repl_send((repl_msg_ptr_t)&needhistinfo_msg, "REPL_NEED_HISTINFO", seqno, needhistinfo_msg.strm_num);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Remote side does not support supplementary protocol. Use older protocol messages to communicate. */
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
/*************** Receive REPL_OLD_TRIPLEINFO1 message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo1_msg, MIN_REPL_MSGLEN,
REPL_OLD_TRIPLEINFO1, "REPL_OLD_TRIPLEINFO1"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_TRIPLEINFO1 == histinfo1_msg.type);
/*************** Receive REPL_OLD_TRIPLEINFO2 message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo2_msg, MIN_REPL_MSGLEN,
REPL_OLD_TRIPLEINFO2, "REPL_OLD_TRIPLEINFO2"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_TRIPLEINFO2 == histinfo2_msg.type);
/* Check if start_seqno in HISTINFO1 and HISTINFO2 message is the same. If not something is wrong */
if (histinfo1_msg.start_seqno != histinfo2_msg.start_seqno)
{
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "REPL_OLD_TRIPLEINFO1 msg has start_seqno %llu [0x%llx] while "
"corresponding REPL_OLD_TRIPLEINFO2 msg has a different start_seqno %llu [0x%llx]. "
"Closing connection.\n", histinfo1_msg.start_seqno, histinfo1_msg.start_seqno,
histinfo2_msg.start_seqno, histinfo2_msg.start_seqno);
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
}
memset(histinfo, 0, SIZEOF(*histinfo));
memcpy(histinfo->root_primary_instname, histinfo1_msg.instname, MAX_INSTNAME_LEN - 1);
histinfo->start_seqno = histinfo1_msg.start_seqno;
histinfo->root_primary_cycle = histinfo2_msg.cycle;
histinfo->histinfo_num = histinfo2_msg.histinfo_num;
} else
{ /* Remote side does support supplementary protocol. Use newer protocol messages to communicate. */
/*************** Receive REPL_HISTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo_msg, SIZEOF(repl_histinfo_msg_t),
REPL_HISTINFO, "REPL_HISTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_HISTINFO == histinfo_msg.type);
*histinfo = histinfo_msg.history;
}
assert(0 <= histinfo->histinfo_num);
return TRUE;
}
/* Given an input "seqno", this function determines how many non-supplementary streams are known to the receiver server
* as of instance jnl_seqno = "seqno". It then compares if the source side list of known streams are identical. For each
* such stream, exchange and verify the stream-specific history record corresponding to "seqno" is the same.
*
* seqno --> The journal seqno that is to be searched in the instance file history.
* *rollback_first --> Set to TRUE if we find some stream history record not matching between the source & receiver side.
*/
boolean_t gtmsource_check_remote_strm_histinfo(seq_num seqno, boolean_t *rollback_first)
{
boolean_t lcl_strm_valid, remote_strm_valid;
int4 lcl_histinfo_num;
char err_string[1024];
int idx, status;
repl_histinfo local_histinfo;
repl_histinfo_msg_t histinfo_msg;
repl_needhistinfo_msg_t needhistinfo_msg;
repl_needstrminfo_msg_t needstrminfo_msg;
repl_strminfo_msg_t strminfo_msg;
assert(remote_side->is_supplementary);
assert(REPL_PROTO_VER_SUPPLEMENTARY <= remote_side->proto_ver);
assert(0 == strm_index);
assert(FALSE == *rollback_first);
/*************** Send REPL_NEED_STRMINFO message ***************/
memset(&needstrminfo_msg, 0, SIZEOF(needstrminfo_msg));
assert(SIZEOF(needstrminfo_msg) == MIN_REPL_MSGLEN);
needstrminfo_msg.type = REPL_NEED_STRMINFO;
needstrminfo_msg.len = MIN_REPL_MSGLEN;
needstrminfo_msg.seqno = seqno;
gtmsource_repl_send((repl_msg_ptr_t)&needstrminfo_msg, "REPL_NEED_STRMINFO", seqno, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_STRMINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&strminfo_msg, SIZEOF(repl_strminfo_msg_t), REPL_STRMINFO, "REPL_STRMINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_STRMINFO == strminfo_msg.type);
/* Verify that the list of known streams is identical on both sides */
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return FALSE; /* concurrent online rollback happened */
status = repl_inst_histinfo_find_seqno(seqno, INVALID_SUPPL_STRM, &local_histinfo);
rel_lock(jnlpool.jnlpool_dummy_reg);
assert(0 == status); /* we are guaranteed to find this since we have already verified 0th stream matches */
/* Fix last_histinfo_num[] in local side to include "local_histinfo" too (which could have strm_index > 0) */
if (0 < local_histinfo.strm_index)
{
assert(local_histinfo.last_histinfo_num[local_histinfo.strm_index] < local_histinfo.histinfo_num);
local_histinfo.last_histinfo_num[local_histinfo.strm_index] = local_histinfo.histinfo_num;
}
/* Skip 0th stream as it has already been verified */
for (idx = 1; idx < MAX_SUPPL_STRMS; idx++)
{
lcl_strm_valid = (INVALID_HISTINFO_NUM != local_histinfo.last_histinfo_num[idx]);
remote_strm_valid = (INVALID_HISTINFO_NUM != strminfo_msg.last_histinfo_num[idx]);
if (!lcl_strm_valid && remote_strm_valid)
rts_error(VARLSTCNT(3) ERR_STRMNUMMISMTCH1, 1, idx);
else if (lcl_strm_valid && !remote_strm_valid)
rts_error(VARLSTCNT(3) ERR_STRMNUMMISMTCH2, 1, idx);
}
/* Now that we know both sides have the exact set of known streams, verify history record for each stream matches */
/* Send REPL_NEED_HISTINFO message for each stream. Do common initialization outside loop */
memset(&needhistinfo_msg, 0, SIZEOF(needhistinfo_msg));
assert(SIZEOF(needhistinfo_msg) == MIN_REPL_MSGLEN);
needhistinfo_msg.type = REPL_NEED_HISTINFO;
needhistinfo_msg.len = MIN_REPL_MSGLEN;
/* No need to initialize the following as needhistinfo_msg.histinfo_num will override those.
* needhistinfo_msg.seqno = ...
*/
for (idx = 1; idx < MAX_SUPPL_STRMS; idx++)
{
lcl_histinfo_num = local_histinfo.last_histinfo_num[idx];
if (INVALID_HISTINFO_NUM == lcl_histinfo_num)
continue;
/* Find corresponding history record on REMOTE side */
assert(INVALID_HISTINFO_NUM != strminfo_msg.last_histinfo_num[idx]);
needhistinfo_msg.histinfo_num = strminfo_msg.last_histinfo_num[idx];
needhistinfo_msg.strm_num = idx;
/*************** Send REPL_NEED_HISTINFO message ***************/
gtmsource_repl_send((repl_msg_ptr_t)&needhistinfo_msg, "REPL_NEED_HISTINFO", seqno, needhistinfo_msg.strm_num);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_HISTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo_msg, SIZEOF(repl_histinfo_msg_t),
REPL_HISTINFO, "REPL_HISTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_HISTINFO == histinfo_msg.type);
/* Find corresponding history record on LOCAL side */
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return FALSE; /* concurrent online rollback happened */
status = repl_inst_histinfo_get(lcl_histinfo_num, &local_histinfo);
assert(0 == status); /* Since we pass histinfo_num of 0 which is >=0 and < num_histinfo */
rel_lock(jnlpool.jnlpool_dummy_reg);
/* Compare the two history records. If they are not identical for even one stream, signal rollback on receiver */
if (!gtmsource_is_histinfo_identical(&histinfo_msg.history, &local_histinfo, seqno, OK_TO_LOG_TRUE))
*rollback_first = TRUE;
}
return TRUE;
}
/* This function finds the 'n'th histinfo record in the replication instance file of this instance.
* This is a wrapper on top of "repl_inst_histinfo_get" which additionally does error checking.
* This closes the connection if it detects a REPLINSTNOHIST error.
*/
void gtmsource_histinfo_get(int4 index, repl_histinfo *histinfo)
{
unix_db_info *udi;
char histdetail[256];
int4 status;
repl_msg_t instnohist_msg;
udi = FILE_INFO(jnlpool.jnlpool_dummy_reg);
assert(udi->s_addrs.now_crit);
assert(NULL != jnlpool.repl_inst_filehdr); /* journal pool should be set up */
status = repl_inst_histinfo_get(index, histinfo);
assert((0 == status) || (INVALID_HISTINFO_NUM == index));
assert((0 != status) || (index == histinfo->histinfo_num));
if (0 != status)
{
assert(ERR_REPLINSTNOHIST == status); /* currently the only error returned by "repl_inst_histinfo_get" */
SPRINTF(histdetail, "record number %d [0x%x]", index, index);
gtm_putmsg(VARLSTCNT(6) ERR_REPLINSTNOHIST, 4, LEN_AND_STR(histdetail), LEN_AND_STR(udi->fn));
/* Send this error status to the receiver server before closing the connection. This way the receiver
* will know to shut down rather than loop back trying to reconnect. This avoids an infinite loop of
* connection open and closes between the source server and receiver server.
*/
instnohist_msg.type = REPL_INST_NOHIST;
instnohist_msg.len = MIN_REPL_MSGLEN;
memset(&instnohist_msg.msg[0], 0, SIZEOF(instnohist_msg.msg));
gtmsource_repl_send((repl_msg_ptr_t)&instnohist_msg, "REPL_INST_NOHIST", MAX_SEQNO, INVALID_SUPPL_STRM);
/* Close the connection */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset due to above REPLINSTNOHIST error\n");
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
}
}
/* Given two histinfo records (one each from the primary and secondary) corresponding to the seqno "jnl_seqno-1", this function
* compares if the history information in both histinfo records is the same. If so, it returns TRUE else it returns FALSE.
*/
boolean_t gtmsource_is_histinfo_identical(repl_histinfo *remote_histinfo, repl_histinfo *local_histinfo,
seq_num jnl_seqno, boolean_t ok_to_log)
{
if (ok_to_log)
{
repl_log(gtmsource_log_fp, FALSE, FALSE, "On Secondary, seqno %llu [0x%llx] generated by instance name [%s] "
"with cycle [0x%x] (%d), pid = %d : time = %d [0x%x]\n", jnl_seqno - 1, jnl_seqno - 1,
remote_histinfo->root_primary_instname, remote_histinfo->root_primary_cycle,
remote_histinfo->root_primary_cycle, remote_histinfo->creator_pid,
remote_histinfo->created_time, remote_histinfo->created_time);
repl_log(gtmsource_log_fp, FALSE, FALSE, "On Primary, seqno %llu [0x%llx] generated by instance name [%s] "
"with cycle [0x%x] (%d), pid = %d : time = %d [0x%x]\n", jnl_seqno - 1, jnl_seqno - 1,
local_histinfo->root_primary_instname, local_histinfo->root_primary_cycle,
local_histinfo->root_primary_cycle, local_histinfo->creator_pid,
local_histinfo->created_time, local_histinfo->created_time);
}
/* Starting with the version of GT.M that supports supplementary instances, we check for a lot more pieces of the history.
* But if remote side is an older version that does not support the supplementary protocol, check only those pieces
* which were checked previously.
*/
if (STRCMP(local_histinfo->root_primary_instname, remote_histinfo->root_primary_instname)
|| (local_histinfo->root_primary_cycle != remote_histinfo->root_primary_cycle)
|| ((REPL_PROTO_VER_SUPPLEMENTARY <= remote_side->proto_ver)
&& ((local_histinfo->creator_pid != remote_histinfo->creator_pid)
|| (local_histinfo->created_time != remote_histinfo->created_time))))
{ /* either the root primary instance name or the cycle did not match */
if (ok_to_log)
repl_log(gtmsource_log_fp, FALSE, FALSE, "Primary and Secondary have DIFFERING history records for "
"seqno %llu [0x%llx]\n", jnl_seqno - 1, jnl_seqno - 1);
return FALSE;
} else
{
if (ok_to_log)
repl_log(gtmsource_log_fp, FALSE, FALSE, "Primary and Secondary have IDENTICAL history records for "
"seqno %llu [0x%llx]\n", jnl_seqno - 1, jnl_seqno - 1);
return TRUE;
}
}
/* Determine the resync seqno between primary and secondary by comparing local and remote histinfo records from the tail of the
* respective instance files until we reach a seqno whose histinfo record information is identical in both. The resync seqno
* is the first seqno whose histinfo record information was NOT identical in both. The histinfo records on the secondary are
* obtained through successive calls to the function "gtmsource_get_remote_histinfo".
*
* If the connection gets reset while exchanging histinfo records with secondary, this function returns a seqno of MAX_SEQNO.
* The global variable "gtmsource_state" will be set to GTMSOURCE_CHANGING_MODE or GTMSOURCE_WAITING_FOR_CONNECTION and the
* caller of this function should accordingly check for that immediately on return.
*/
seq_num gtmsource_find_resync_seqno(repl_histinfo *local_histinfo, repl_histinfo *remote_histinfo)
{
seq_num max_start_seqno, local_start_seqno, remote_start_seqno;
int4 local_histinfo_num;
DEBUG_ONLY(int4 prev_remote_histinfo_num;)
DEBUG_ONLY(sgmnt_addrs *csa;)
DEBUG_ONLY(seq_num min_start_seqno;)
assert((NULL != jnlpool.jnlpool_dummy_reg) && jnlpool.jnlpool_dummy_reg->open);
DEBUG_ONLY(
csa = &FILE_INFO(jnlpool.jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
)
DEBUG_ONLY(prev_remote_histinfo_num = remote_histinfo->prev_histinfo_num;)
do
{
local_start_seqno = local_histinfo->start_seqno;
remote_start_seqno = remote_histinfo->start_seqno;
assert(local_start_seqno);
assert(remote_start_seqno);
max_start_seqno = MAX(local_start_seqno, remote_start_seqno);
/* "max_start_seqno" is the largest yet known seqno whose histinfo does NOT match between primary and secondary.
* Therefore determine the histinfo(s) for "max_start_seqno-1" in primary and/or secondary and compare them.
*/
if (1 == max_start_seqno)
{ /* The earliest possible seqno in the primary is out of sync with that of the secondary. Stop the histinfo
* search right away and return with 1 (the smallest possible seqno) as the resync seqno.
*/
assert(local_start_seqno == max_start_seqno);
assert(remote_start_seqno == max_start_seqno);
assert(INVALID_HISTINFO_NUM == local_histinfo->prev_histinfo_num);
assert(INVALID_HISTINFO_NUM == remote_histinfo->prev_histinfo_num);
break;
}
DEBUG_ONLY(min_start_seqno = MIN(local_start_seqno, remote_start_seqno);)
assert(!gtmsource_is_histinfo_identical(remote_histinfo, local_histinfo, min_start_seqno, OK_TO_LOG_FALSE));
if (local_start_seqno == max_start_seqno)
{ /* Need to get the previous histinfo record on the primary */
local_histinfo_num = local_histinfo->prev_histinfo_num;
assert(0 <= local_histinfo->histinfo_num);
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return MAX_SEQNO;
gtmsource_histinfo_get(local_histinfo_num, local_histinfo);
rel_lock(jnlpool.jnlpool_dummy_reg);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
return MAX_SEQNO; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
if (remote_start_seqno == max_start_seqno)
{ /* Need to get the previous histinfo record on the secondary */
assert(0 <= prev_remote_histinfo_num);
if (!gtmsource_get_remote_histinfo(remote_start_seqno, remote_histinfo))
{
assert((GTMSOURCE_CHANGING_MODE == gtmsource_state)
|| (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state));
/* Connection got reset while exchanging histinfo history information */
return MAX_SEQNO;
}
assert(remote_histinfo->histinfo_num == prev_remote_histinfo_num);
DEBUG_ONLY(prev_remote_histinfo_num = remote_histinfo->prev_histinfo_num);
}
assert(local_histinfo->start_seqno < max_start_seqno);
assert(remote_histinfo->start_seqno < max_start_seqno);
} while (!gtmsource_is_histinfo_identical(remote_histinfo, local_histinfo, max_start_seqno - 1, OK_TO_LOG_TRUE));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Resync Seqno determined is %llu [0x%llx]\n", max_start_seqno, max_start_seqno);
/* "max_start_seqno-1" has same histinfo info in both primary and secondary. Hence "max_start_seqno" is the resync seqno. */
return max_start_seqno;
}
/* This routine sends a REPL_HISTREC message for the histinfo record corresponding to seqno "gtmsource_local->read_jnl_seqno".
* It positions the send to that histinfo record which corresponds to "gtmsource_local->read_jnl_seqno". This is done by
* a call to the function "gtmsource_set_next_histinfo_seqno".
* On return from this routine, the caller should check the value of the global variable "gtmsource_state" to see if it is
* either of GTMSOURCE_CHANGING_MODE or GTMSOURCE_WAITING_FOR_CONNECTION and if so take appropriate action.
*/
void gtmsource_send_new_histrec()
{
repl_histinfo histinfo, zero_histinfo;
repl_old_triple_msg_t oldtriple_msg;
gtmsource_local_ptr_t gtmsource_local;
boolean_t first_histrec_send;
int4 zero_histinfo_num;
DEBUG_ONLY(sgmnt_addrs *csa;)
assert((NULL != jnlpool.jnlpool_dummy_reg) && jnlpool.jnlpool_dummy_reg->open);
DEBUG_ONLY(
csa = &FILE_INFO(jnlpool.jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
)
gtmsource_local = jnlpool.gtmsource_local;
assert(gtmsource_local->send_new_histrec);
assert(gtmsource_local->read_jnl_seqno <= gtmsource_local->next_histinfo_seqno);
first_histrec_send = (-1 == gtmsource_local->next_histinfo_num);
gtmsource_set_next_histinfo_seqno(FALSE); /* sets gtmsource_local->next_histinfo_seqno & next_histinfo_num */
if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return; /* "gtmsource_set_next_histinfo_seqno" encountered REPLINSTNOHIST or concurrent online rollback occurred */
/*************** Read histinfo (to send) from instance file first ***************/
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return;
assert(1 <= gtmsource_local->next_histinfo_num);
/* With non-supplementary instances, we are guaranteed, consecutive history records increase in start_seqno.
* But with supplementary instances, it is possible for two consecutive history records to have the same start_seqno
* (if those came from different root primary instances at the same time). Take care of that in the assert below.
*/
assert(!this_side->is_supplementary && (gtmsource_local->read_jnl_seqno < gtmsource_local->next_histinfo_seqno)
|| this_side->is_supplementary && (gtmsource_local->read_jnl_seqno <= gtmsource_local->next_histinfo_seqno));
gtmsource_histinfo_get(gtmsource_local->next_histinfo_num - 1, &histinfo);
if ((GTMSOURCE_WAITING_FOR_CONNECTION != gtmsource_state) && this_side->is_supplementary && first_histrec_send
&& (0 < histinfo.strm_index))
{ /* Supplementary source side sending to a supplementary receiver. And this is the FIRST history record
* being sent for this replication connection. It is possible that the closest history record prior to
* "gtmsource_local->read_jnl_seqno" has a non-zero "strm_index". In that case, we might be sending a
* non-zero strm_index history record across and it is possible that the secondary has an empty instance
* file (in case of an -updateresync startup). This would lead to issues on the receiving supplementary
* instance since it will now have a non-zero stream history record as the first history record in its
* instance file. This is an out-of-design situation since to establish the resync point between two
* supplementary instances, we need to examine the 0th stream and the assumption is that if the instance
* file has at least one history record, there is at least one 0th stream history record. Therefore we
* need to prevent this out-of-design situation. Towards that, find out the most recent 0th stream
* history record in this instance and send that across BEFORE sending the history record corresponding
* to "gtmsource_local->read_jnl_seqno".
*/
zero_histinfo_num = histinfo.last_histinfo_num[0];
assert(INVALID_HISTINFO_NUM != zero_histinfo_num);
assert(0 <= zero_histinfo_num);
gtmsource_histinfo_get(zero_histinfo_num, &zero_histinfo);
} else
zero_histinfo_num = INVALID_HISTINFO_NUM;
rel_lock(jnlpool.jnlpool_dummy_reg);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(gtmsource_local->read_jnl_seqno >= histinfo.start_seqno);
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Remote side does not support supplementary protocol. Use older protocol messages to communicate. */
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
assert(!this_side->is_supplementary);
/*************** Send REPL_OLD_TRIPLE message ***************/
memset(&oldtriple_msg, 0, SIZEOF(oldtriple_msg));
oldtriple_msg.type = REPL_OLD_TRIPLE;
oldtriple_msg.len = SIZEOF(oldtriple_msg);
oldtriple_msg.triplecontent.jrec_type = JRT_TRIPLE;
if (remote_side->cross_endian && (this_side->jnl_ver < remote_side->jnl_ver))
{
oldtriple_msg.triplecontent.forwptr = GTM_BYTESWAP_24(SIZEOF(repl_old_triple_jnl_t));
oldtriple_msg.triplecontent.start_seqno = GTM_BYTESWAP_64(gtmsource_local->read_jnl_seqno);
oldtriple_msg.triplecontent.cycle = GTM_BYTESWAP_32(histinfo.root_primary_cycle);
} else
{
oldtriple_msg.triplecontent.forwptr = SIZEOF(repl_old_triple_jnl_t);
oldtriple_msg.triplecontent.start_seqno = gtmsource_local->read_jnl_seqno;
oldtriple_msg.triplecontent.cycle = histinfo.root_primary_cycle;
}
memcpy(oldtriple_msg.triplecontent.instname, histinfo.root_primary_instname, MAX_INSTNAME_LEN - 1);
gtmsource_repl_send((repl_msg_ptr_t)&oldtriple_msg, "REPL_OLD_TRIPLE",
gtmsource_local->read_jnl_seqno, INVALID_SUPPL_STRM);
} else
{ /* Remote side supports supplementary protocol. Communicate using new message protocol */
if (INVALID_HISTINFO_NUM != zero_histinfo_num)
{ /* Send REPL_HISTREC message corresponding to the 0th stream history record */
assert(gtmsource_local->read_jnl_seqno >= zero_histinfo.start_seqno);
GTMSOURCE_SEND_REPL_HISTREC(zero_histinfo, gtmsource_local, remote_side->cross_endian);
/* the above macro would have invoked "gtmsource_repl_send" so check for "gtmsource_state" */
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return; /* send did not succeed */
/* If we have a non-zero stream history record, then check if its "start_seqno" is lesser than
* gtmsource_local->read_jnl_seqno. If so, we dont even need to send this history record. If it is
* equal though, we do need to send this across.
*/
assert(gtmsource_local->read_jnl_seqno >= histinfo.start_seqno);
if (gtmsource_local->read_jnl_seqno == histinfo.start_seqno)
GTMSOURCE_SEND_REPL_HISTREC(histinfo, gtmsource_local, remote_side->cross_endian);
} else
GTMSOURCE_SEND_REPL_HISTREC(histinfo, gtmsource_local, remote_side->cross_endian);
}
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return; /* send did not succeed */
repl_dump_histinfo(gtmsource_log_fp, TRUE, TRUE, "New History Content", &histinfo);
gtmsource_local->send_new_histrec = FALSE;
}
/* This function is invoked once for each histinfo record that the source server goes past while sending journal records across.
* This function sets the boundary seqno field "next_histinfo_seqno" to be the "start_seqno" of the next histinfo record so the
* source server does not send any seqnos corresponding to the next histinfo record before sending a REPL_HISTREC message.
* It will set "gtmsource_local->next_histinfo_seqno" and "gtmsource_local->next_histinfo_num" to correspond to the next histinfo
* record and set the private copy "gtmsource_local->num_histinfo" to a copy of what is currently present in
* "repl_inst_filehdr->num_histinfo".
*
* The input variable "detect_new_histinfo" is set to TRUE if this function is called from "gtmsource_readfiles" or
* "gtmsource_readpool" the moment they detect that the instance file has had a new histinfo record since the last time this
* source server took a copy of it in its private "gtmsource_local->num_histinfo". In this case, the only objective
* is to find the start_seqno of the next histinfo record and note that down as "gtmsource_local->next_histinfo_seqno".
*
* If the input variable "detect_new_histinfo" is set to FALSE, "next_histinfo_seqno" is set to the starting seqno of the
* histinfo record immediately after that corresponding to "gtmsource_local->read_jnl_seqno".
*
* This can end up closing the connection if the call to "gtmsource_histinfo_get" or "repl_inst_histinfo_find_seqno" fails.
*/
void gtmsource_set_next_histinfo_seqno(boolean_t detect_new_histinfo)
{
unix_db_info *udi;
int4 status, next_histinfo_num, num_histinfo;
repl_histinfo next_histinfo, prev_histinfo;
gtmsource_local_ptr_t gtmsource_local;
repl_msg_t instnohist_msg;
char histdetail[256];
seq_num read_seqno;
DEBUG_ONLY(sgmnt_addrs *csa;)
assert((NULL != jnlpool.jnlpool_dummy_reg) && jnlpool.jnlpool_dummy_reg->open);
DEBUG_ONLY(
csa = &FILE_INFO(jnlpool.jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
)
GRAB_LOCK(jnlpool.jnlpool_dummy_reg, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return;
assert(NULL != jnlpool.repl_inst_filehdr); /* journal pool should be set up */
gtmsource_local = jnlpool.gtmsource_local;
next_histinfo_num = gtmsource_local->next_histinfo_num;
/* assert((-1 == next_histinfo_num) || (gtmsource_local->next_histinfo_seqno >= gtmsource_local->read_jnl_seqno)); */
read_seqno = gtmsource_local->read_jnl_seqno;
assert(gtmsource_local->next_histinfo_seqno >= read_seqno);
num_histinfo = jnlpool.repl_inst_filehdr->num_histinfo;
if (!detect_new_histinfo)
{
if (-1 == next_histinfo_num)
{ /* This is the first invocation of this function after this connection with a receiver was established.
* Find the first histinfo record in the local instance file corresponding to the next seqno to be sent
* across i.e. "gtmsource_local->read_jnl_seqno". The below function will return the history record
* just BEFORE read_jnl_seqno. So fetch the immediately next history record to get the desired record.
*/
assert(read_seqno <= jnlpool.jnlpool_ctl->jnl_seqno);
status = repl_inst_histinfo_find_seqno(read_seqno, INVALID_SUPPL_STRM, &prev_histinfo);
if (0 != status)
{
assert(ERR_REPLINSTNOHIST == status); /* only error returned by "repl_inst_histinfo_find_seqno" */
assert((INVALID_HISTINFO_NUM == prev_histinfo.histinfo_num)
|| (prev_histinfo.start_seqno >= read_seqno));
if ((INVALID_HISTINFO_NUM == prev_histinfo.histinfo_num)
|| (prev_histinfo.start_seqno > read_seqno))
{ /* The read seqno is PRIOR to the starting seqno of the instance file.
* In that case, issue error and close the connection.
*/
NON_GTM64_ONLY(SPRINTF(histdetail, "seqno [0x%llx]", read_seqno - 1));
GTM64_ONLY(SPRINTF(histdetail, "seqno [0x%lx]", read_seqno - 1));
udi = FILE_INFO(jnlpool.jnlpool_dummy_reg);
gtm_putmsg(VARLSTCNT(6) ERR_REPLINSTNOHIST, 4,
LEN_AND_STR(histdetail), LEN_AND_STR(udi->fn));
/* Send error status to the receiver server before closing the connection. This way the
* receiver will know to shut down rather than loop back trying to reconnect. This avoids
* an infinite loop of connection open/closes between the source and receiver servers.
*/
instnohist_msg.type = REPL_INST_NOHIST;
instnohist_msg.len = MIN_REPL_MSGLEN;
memset(&instnohist_msg.msg[0], 0, SIZEOF(instnohist_msg.msg));
gtmsource_repl_send((repl_msg_ptr_t)&instnohist_msg, "REPL_INST_NOHIST",
MAX_SEQNO, INVALID_SUPPL_STRM);
rel_lock(jnlpool.jnlpool_dummy_reg);
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Connection reset due to above REPLINSTNOHIST error\n");
repl_close(&gtmsource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return;
}
next_histinfo_num = prev_histinfo.histinfo_num;
} else
{
assert(prev_histinfo.start_seqno < read_seqno);
next_histinfo_num = prev_histinfo.histinfo_num;
if ((next_histinfo_num + 1) < num_histinfo)
{
gtmsource_histinfo_get(next_histinfo_num + 1, &next_histinfo);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
rel_lock(jnlpool.jnlpool_dummy_reg);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(next_histinfo.start_seqno >= read_seqno);
if (next_histinfo.start_seqno == read_seqno)
{
next_histinfo_num++;
assert(next_histinfo_num == next_histinfo.histinfo_num);
}
}
}
}
/* else: next_histinfo_num was already found. So just move on to the NEXT history record. */
assert(0 <= next_histinfo_num);
assert(next_histinfo_num < num_histinfo);
assert((gtmsource_local->next_histinfo_num != next_histinfo_num)
|| (read_seqno == gtmsource_local->next_histinfo_seqno)
|| (MAX_SEQNO == gtmsource_local->next_histinfo_seqno));
next_histinfo_num++;
} else
{ /* A new histinfo record got added to the instance file since we knew last.
* Set "next_histinfo_seqno" for our current histinfo down from its current value of MAX_SEQNO.
*/
assert(gtmsource_local->next_histinfo_seqno == MAX_SEQNO);
assert(next_histinfo_num < num_histinfo);
if (READ_FILE == gtmsource_local->read_state)
{ /* It is possible that we have already read the journal records for the next
* read_jnl_seqno before detecting that a histinfo has to be sent first. In that case,
* the journal files may have been positioned ahead of the read_jnl_seqno for the
* next read. Indicate that they have to be repositioned into the past.
*/
gtmsource_set_lookback();
}
}
if (num_histinfo > next_histinfo_num)
{ /* Read the next histinfo record to determine its "start_seqno" */
gtmsource_histinfo_get(next_histinfo_num, &next_histinfo);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
rel_lock(jnlpool.jnlpool_dummy_reg);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(next_histinfo.start_seqno >= read_seqno);
gtmsource_local->next_histinfo_seqno = next_histinfo.start_seqno;
} else
gtmsource_local->next_histinfo_seqno = MAX_SEQNO;
gtmsource_local->next_histinfo_num = next_histinfo_num;
gtmsource_local->num_histinfo = num_histinfo;
rel_lock(jnlpool.jnlpool_dummy_reg);
}