256 lines
9.7 KiB
C
256 lines
9.7 KiB
C
|
/****************************************************************
|
||
|
* *
|
||
|
* Copyright 2006, 2010 Fidelity Information Services, Inc *
|
||
|
* *
|
||
|
* This source code contains the intellectual property *
|
||
|
* of its copyright holder(s), and is made available *
|
||
|
* under a license. If you do not know the terms of *
|
||
|
* the license, please stop and do not read further. *
|
||
|
* *
|
||
|
****************************************************************/
|
||
|
|
||
|
#include "mdef.h"
|
||
|
|
||
|
#include "gtm_string.h"
|
||
|
#include "gtm_time.h"
|
||
|
#include "gtm_inet.h" /* Required for gtmsource.h */
|
||
|
#include "gtm_stdio.h"
|
||
|
|
||
|
#include <sys/time.h>
|
||
|
#include <errno.h>
|
||
|
#ifdef VMS
|
||
|
#include <descrip.h> /* Required for gtmsource.h */
|
||
|
#endif
|
||
|
|
||
|
#include "gdsroot.h"
|
||
|
#include "gdsblk.h"
|
||
|
#include "gtm_facility.h"
|
||
|
#include "fileinfo.h"
|
||
|
#include "gdsbt.h"
|
||
|
#include "gdsfhead.h"
|
||
|
#include "filestruct.h"
|
||
|
#include "jnl.h"
|
||
|
#include "repl_msg.h"
|
||
|
#include "gtmsource.h"
|
||
|
#include "repl_comm.h"
|
||
|
#include "repl_dbg.h"
|
||
|
#include "repl_log.h"
|
||
|
#include "repl_errno.h"
|
||
|
#include "iosp.h"
|
||
|
#include "gt_timer.h"
|
||
|
#include "gtmsource_heartbeat.h"
|
||
|
#include "relqop.h"
|
||
|
|
||
|
GBLREF jnlpool_addrs jnlpool;
|
||
|
GBLREF int gtmsource_sock_fd;
|
||
|
GBLREF boolean_t gtmsource_logstats;
|
||
|
GBLREF int gtmsource_log_fd;
|
||
|
GBLREF FILE *gtmsource_log_fp;
|
||
|
GBLREF int gtmsource_statslog_fd;
|
||
|
GBLREF FILE *gtmsource_statslog_fp;
|
||
|
GBLREF struct timeval gtmsource_poll_wait, gtmsource_poll_immediate;
|
||
|
GBLREF gtmsource_state_t gtmsource_state;
|
||
|
GBLREF gd_addr *gd_header;
|
||
|
|
||
|
GBLDEF boolean_t heartbeat_stalled = TRUE;
|
||
|
GBLDEF repl_heartbeat_que_entry_t *repl_heartbeat_que_head = NULL;
|
||
|
GBLDEF repl_heartbeat_que_entry_t *repl_heartbeat_free_head = NULL;
|
||
|
GBLDEF volatile time_t gtmsource_now;
|
||
|
GBLDEF time_t last_sent_time, earliest_sent_time;
|
||
|
|
||
|
static int heartbeat_period, heartbeat_max_wait;
|
||
|
|
||
|
void gtmsource_heartbeat_timer(TID tid, int4 interval_len, int *interval_ptr)
|
||
|
{
|
||
|
assert(0 != gtmsource_now);
|
||
|
UNIX_ONLY(assert(*interval_ptr == heartbeat_period);) /* interval_len and interval_ptr are dummies on VMS */
|
||
|
gtmsource_now += heartbeat_period; /* cannot use *interval_ptr on VMS */
|
||
|
REPL_DPRINT2("Starting heartbeat timer with %d s\n", heartbeat_period);
|
||
|
start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period),
|
||
|
&heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */
|
||
|
}
|
||
|
|
||
|
int gtmsource_init_heartbeat(void)
|
||
|
{
|
||
|
int num_q_entries;
|
||
|
repl_heartbeat_que_entry_t *heartbeat_element;
|
||
|
|
||
|
error_def(ERR_REPLCOMM);
|
||
|
error_def(ERR_TEXT);
|
||
|
|
||
|
assert(NULL == repl_heartbeat_que_head);
|
||
|
|
||
|
heartbeat_period = jnlpool.gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_PERIOD];
|
||
|
heartbeat_max_wait = jnlpool.gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_MAX_WAIT];
|
||
|
num_q_entries = DIVIDE_ROUND_UP(heartbeat_max_wait, heartbeat_period) + 2;
|
||
|
REPL_DPRINT4("Initialized heartbeat, heartbeat_period = %d s, heartbeat_max_wait = %d s, num_q_entries = %d\n",
|
||
|
heartbeat_period, heartbeat_max_wait, num_q_entries);
|
||
|
if (!(repl_heartbeat_que_head = (repl_heartbeat_que_entry_t *)malloc(num_q_entries * SIZEOF(repl_heartbeat_que_entry_t))))
|
||
|
rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
|
||
|
RTS_ERROR_LITERAL("Error in allocating heartbeat queue"), errno);
|
||
|
|
||
|
memset(repl_heartbeat_que_head, 0, num_q_entries * SIZEOF(repl_heartbeat_que_entry_t));
|
||
|
repl_heartbeat_free_head = repl_heartbeat_que_head + 1;
|
||
|
*(gtm_time4_t *)&repl_heartbeat_que_head->heartbeat.ack_time[0] = 0;
|
||
|
*(gtm_time4_t *)&repl_heartbeat_free_head->heartbeat.ack_time[0] = 0;
|
||
|
for (heartbeat_element = repl_heartbeat_free_head + 1, num_q_entries -= 2;
|
||
|
num_q_entries > 0;
|
||
|
num_q_entries--, heartbeat_element++)
|
||
|
{
|
||
|
insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);
|
||
|
}
|
||
|
last_sent_time = gtmsource_now = time(NULL);
|
||
|
/* Ideally, we should use the Greatest Common Factor of heartbeat_period and GTMSOURCE_LOGSTATS_INTERVAL as the time keeper
|
||
|
* interval. As it stands now, we may not honor GTMSOURCE_LOGSTATS_INTERVAL if user specifies a heartbeat value
|
||
|
* larger than GTMSOURCE_LOGSTATS_INTERVAL. When we make GTMSOURCE_LOGSTATS_INTERVAL a user configurable parameter,
|
||
|
* this code may have to be revisited. Also, modify the check in gtmsource_process (prev_now != (save_now = gtmsource_now))
|
||
|
* to be something like (hearbeat_period < difftime((save_now = gtmsource_now), prev_now)). Vinaya 2003, Sep 08
|
||
|
*/
|
||
|
start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period),
|
||
|
&heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */
|
||
|
heartbeat_stalled = FALSE;
|
||
|
earliest_sent_time = 0;
|
||
|
return (SS_NORMAL);
|
||
|
}
|
||
|
|
||
|
int gtmsource_stop_heartbeat(void)
|
||
|
{
|
||
|
cancel_timer((TID)gtmsource_heartbeat_timer);
|
||
|
if (NULL != repl_heartbeat_que_head)
|
||
|
free(repl_heartbeat_que_head);
|
||
|
repl_heartbeat_que_head = NULL;
|
||
|
repl_heartbeat_free_head = NULL;
|
||
|
last_sent_time = 0;
|
||
|
earliest_sent_time = 0;
|
||
|
gtmsource_now = 0;
|
||
|
heartbeat_stalled = TRUE;
|
||
|
REPL_DPRINT1("Stopped heartbeat\n");
|
||
|
return (SS_NORMAL);
|
||
|
}
|
||
|
|
||
|
boolean_t gtmsource_is_heartbeat_overdue(time_t *now, repl_heartbeat_msg_ptr_t overdue_heartbeat)
|
||
|
{
|
||
|
|
||
|
repl_heartbeat_que_entry_t *heartbeat_element;
|
||
|
double time_elapsed;
|
||
|
unsigned char seq_num_str[32], *seq_num_ptr;
|
||
|
|
||
|
#ifndef REPL_DISABLE_HEARTBEAT
|
||
|
if (0 == earliest_sent_time ||
|
||
|
(time_elapsed = difftime(*now, earliest_sent_time)) <= (double)heartbeat_max_wait)
|
||
|
return (FALSE);
|
||
|
|
||
|
heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head);
|
||
|
if (NULL == heartbeat_element)
|
||
|
{
|
||
|
assert(FALSE);
|
||
|
return (FALSE);
|
||
|
}
|
||
|
|
||
|
memcpy(overdue_heartbeat, &heartbeat_element->heartbeat, SIZEOF(repl_heartbeat_msg_t));
|
||
|
|
||
|
REPL_DPRINT5("Overdue heartbeat - SEQNO : "INT8_FMT" time : %ld now : %ld difftime : %00.f\n",
|
||
|
INT8_PRINT(*(seq_num *)&overdue_heartbeat->ack_seqno[0]), *(gtm_time4_t *)&overdue_heartbeat->ack_time[0],
|
||
|
*now, time_elapsed);
|
||
|
|
||
|
insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);
|
||
|
|
||
|
return (TRUE);
|
||
|
#else
|
||
|
return (FALSE);
|
||
|
#endif
|
||
|
}
|
||
|
|
||
|
int gtmsource_send_heartbeat(time_t *now)
|
||
|
{
|
||
|
repl_heartbeat_que_entry_t *heartbeat_element;
|
||
|
unsigned char *msg_ptr; /* needed for REPL_{SEND,RECV}_LOOP */
|
||
|
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
|
||
|
int status; /* needed for REPL_{SEND,RECV}_LOOP */
|
||
|
unsigned char seq_num_str[32], *seq_num_ptr;
|
||
|
gtmsource_local_ptr_t gtmsource_local;
|
||
|
|
||
|
error_def(ERR_REPLCOMM);
|
||
|
error_def(ERR_TEXT);
|
||
|
|
||
|
heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_free_head);
|
||
|
if (NULL == heartbeat_element) /* Too many pending heartbeats, send later */
|
||
|
return (SS_NORMAL);
|
||
|
|
||
|
QWASSIGN(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0], jnlpool.jnlpool_ctl->jnl_seqno);
|
||
|
*(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0] = (gtm_time4_t)(*now);
|
||
|
|
||
|
heartbeat_element->heartbeat.type = REPL_HEARTBEAT;
|
||
|
heartbeat_element->heartbeat.len = MIN_REPL_MSGLEN;
|
||
|
REPL_SEND_LOOP(gtmsource_sock_fd, &heartbeat_element->heartbeat, MIN_REPL_MSGLEN, FALSE, >msource_poll_immediate)
|
||
|
{
|
||
|
gtmsource_poll_actions(FALSE); /* Recursive call */
|
||
|
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state ||
|
||
|
GTMSOURCE_CHANGING_MODE == gtmsource_state)
|
||
|
return (SS_NORMAL);
|
||
|
}
|
||
|
|
||
|
if (SS_NORMAL == status)
|
||
|
{
|
||
|
insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head);
|
||
|
last_sent_time = *now;
|
||
|
if (0 == earliest_sent_time)
|
||
|
earliest_sent_time = last_sent_time;
|
||
|
|
||
|
REPL_DPRINT4("HEARTBEAT sent with time %ld SEQNO "INT8_FMT" at %ld\n",
|
||
|
*(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0],
|
||
|
INT8_PRINT(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0]), time(NULL));
|
||
|
|
||
|
return (SS_NORMAL);
|
||
|
}
|
||
|
|
||
|
if (EREPL_SEND == repl_errno && REPL_CONN_RESET(status))
|
||
|
{
|
||
|
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while attempting to send heartbeat. Status = %d ; %s\n",
|
||
|
status, STRERROR(status));
|
||
|
repl_close(>msource_sock_fd);
|
||
|
gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
|
||
|
return (SS_NORMAL);
|
||
|
}
|
||
|
if (EREPL_SEND == repl_errno)
|
||
|
rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
|
||
|
RTS_ERROR_LITERAL("Error sending HEARTBEAT message. Error in send"), status);
|
||
|
|
||
|
if (EREPL_SELECT == repl_errno)
|
||
|
rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
|
||
|
RTS_ERROR_LITERAL("Error sending HEARTBEAT message. Error in select"), status);
|
||
|
|
||
|
GTMASSERT;
|
||
|
return -1; /* This will never get executed, added to make compiler happy */
|
||
|
}
|
||
|
|
||
|
int gtmsource_process_heartbeat(repl_heartbeat_msg_ptr_t heartbeat_msg)
|
||
|
{
|
||
|
repl_heartbeat_que_entry_t *heartbeat_element;
|
||
|
seq_num ack_seqno;
|
||
|
gd_region *reg, *region_top;
|
||
|
sgmnt_addrs *csa;
|
||
|
unsigned char seq_num_str[32], *seq_num_ptr;
|
||
|
|
||
|
QWASSIGN(ack_seqno, *(seq_num *)&heartbeat_msg->ack_seqno[0]);
|
||
|
REPL_DPRINT4("HEARTBEAT received with time %ld SEQNO "INT8_FMT" at %ld\n",
|
||
|
*(gtm_time4_t *)&heartbeat_msg->ack_time[0], INT8_PRINT(ack_seqno), time(NULL));
|
||
|
|
||
|
for (heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head);
|
||
|
NULL != heartbeat_element&&
|
||
|
*(gtm_time4_t *)&heartbeat_msg->ack_time[0] >= (gtm_time4_t)earliest_sent_time;
|
||
|
heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head))
|
||
|
{
|
||
|
insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head);
|
||
|
earliest_sent_time =
|
||
|
(time_t) *(gtm_time4_t *)&((repl_heartbeat_que_entry_t *)
|
||
|
((unsigned char *)repl_heartbeat_que_head +
|
||
|
repl_heartbeat_que_head->que.fl))->heartbeat.ack_time[0];
|
||
|
}
|
||
|
|
||
|
if (NULL != heartbeat_element)
|
||
|
insqh((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head);
|
||
|
|
||
|
return (SS_NORMAL);
|
||
|
}
|