/**************************************************************** * * * Copyright 2006, 2010 Fidelity Information Services, Inc * * * * This source code contains the intellectual property * * of its copyright holder(s), and is made available * * under a license. If you do not know the terms of * * the license, please stop and do not read further. * * * ****************************************************************/ #include "mdef.h" #include "gtm_string.h" #include "gtm_time.h" #include "gtm_inet.h" /* Required for gtmsource.h */ #include "gtm_stdio.h" #include #include #ifdef VMS #include /* Required for gtmsource.h */ #endif #include "gdsroot.h" #include "gdsblk.h" #include "gtm_facility.h" #include "fileinfo.h" #include "gdsbt.h" #include "gdsfhead.h" #include "filestruct.h" #include "jnl.h" #include "repl_msg.h" #include "gtmsource.h" #include "repl_comm.h" #include "repl_dbg.h" #include "repl_log.h" #include "repl_errno.h" #include "iosp.h" #include "gt_timer.h" #include "gtmsource_heartbeat.h" #include "relqop.h" GBLREF jnlpool_addrs jnlpool; GBLREF int gtmsource_sock_fd; GBLREF boolean_t gtmsource_logstats; GBLREF int gtmsource_log_fd; GBLREF FILE *gtmsource_log_fp; GBLREF int gtmsource_statslog_fd; GBLREF FILE *gtmsource_statslog_fp; GBLREF struct timeval gtmsource_poll_wait, gtmsource_poll_immediate; GBLREF gtmsource_state_t gtmsource_state; GBLREF gd_addr *gd_header; GBLDEF boolean_t heartbeat_stalled = TRUE; GBLDEF repl_heartbeat_que_entry_t *repl_heartbeat_que_head = NULL; GBLDEF repl_heartbeat_que_entry_t *repl_heartbeat_free_head = NULL; GBLDEF volatile time_t gtmsource_now; GBLDEF time_t last_sent_time, earliest_sent_time; static int heartbeat_period, heartbeat_max_wait; void gtmsource_heartbeat_timer(TID tid, int4 interval_len, int *interval_ptr) { assert(0 != gtmsource_now); UNIX_ONLY(assert(*interval_ptr == heartbeat_period);) /* interval_len and interval_ptr are dummies on VMS */ gtmsource_now += heartbeat_period; /* cannot use *interval_ptr on VMS */ REPL_DPRINT2("Starting heartbeat timer with %d s\n", heartbeat_period); start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period), &heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */ } int gtmsource_init_heartbeat(void) { int num_q_entries; repl_heartbeat_que_entry_t *heartbeat_element; error_def(ERR_REPLCOMM); error_def(ERR_TEXT); assert(NULL == repl_heartbeat_que_head); heartbeat_period = jnlpool.gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_PERIOD]; heartbeat_max_wait = jnlpool.gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_MAX_WAIT]; num_q_entries = DIVIDE_ROUND_UP(heartbeat_max_wait, heartbeat_period) + 2; REPL_DPRINT4("Initialized heartbeat, heartbeat_period = %d s, heartbeat_max_wait = %d s, num_q_entries = %d\n", heartbeat_period, heartbeat_max_wait, num_q_entries); if (!(repl_heartbeat_que_head = (repl_heartbeat_que_entry_t *)malloc(num_q_entries * SIZEOF(repl_heartbeat_que_entry_t)))) rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_LITERAL("Error in allocating heartbeat queue"), errno); memset(repl_heartbeat_que_head, 0, num_q_entries * SIZEOF(repl_heartbeat_que_entry_t)); repl_heartbeat_free_head = repl_heartbeat_que_head + 1; *(gtm_time4_t *)&repl_heartbeat_que_head->heartbeat.ack_time[0] = 0; *(gtm_time4_t *)&repl_heartbeat_free_head->heartbeat.ack_time[0] = 0; for (heartbeat_element = repl_heartbeat_free_head + 1, num_q_entries -= 2; num_q_entries > 0; num_q_entries--, heartbeat_element++) { insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head); } last_sent_time = gtmsource_now = time(NULL); /* Ideally, we should use the Greatest Common Factor of heartbeat_period and GTMSOURCE_LOGSTATS_INTERVAL as the time keeper * interval. As it stands now, we may not honor GTMSOURCE_LOGSTATS_INTERVAL if user specifies a heartbeat value * larger than GTMSOURCE_LOGSTATS_INTERVAL. When we make GTMSOURCE_LOGSTATS_INTERVAL a user configurable parameter, * this code may have to be revisited. Also, modify the check in gtmsource_process (prev_now != (save_now = gtmsource_now)) * to be something like (hearbeat_period < difftime((save_now = gtmsource_now), prev_now)). Vinaya 2003, Sep 08 */ start_timer((TID)gtmsource_heartbeat_timer, heartbeat_period * 1000, gtmsource_heartbeat_timer, SIZEOF(heartbeat_period), &heartbeat_period); /* start_timer expects time interval in milli seconds, heartbeat_period is in seconds */ heartbeat_stalled = FALSE; earliest_sent_time = 0; return (SS_NORMAL); } int gtmsource_stop_heartbeat(void) { cancel_timer((TID)gtmsource_heartbeat_timer); if (NULL != repl_heartbeat_que_head) free(repl_heartbeat_que_head); repl_heartbeat_que_head = NULL; repl_heartbeat_free_head = NULL; last_sent_time = 0; earliest_sent_time = 0; gtmsource_now = 0; heartbeat_stalled = TRUE; REPL_DPRINT1("Stopped heartbeat\n"); return (SS_NORMAL); } boolean_t gtmsource_is_heartbeat_overdue(time_t *now, repl_heartbeat_msg_ptr_t overdue_heartbeat) { repl_heartbeat_que_entry_t *heartbeat_element; double time_elapsed; unsigned char seq_num_str[32], *seq_num_ptr; #ifndef REPL_DISABLE_HEARTBEAT if (0 == earliest_sent_time || (time_elapsed = difftime(*now, earliest_sent_time)) <= (double)heartbeat_max_wait) return (FALSE); heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head); if (NULL == heartbeat_element) { assert(FALSE); return (FALSE); } memcpy(overdue_heartbeat, &heartbeat_element->heartbeat, SIZEOF(repl_heartbeat_msg_t)); REPL_DPRINT5("Overdue heartbeat - SEQNO : "INT8_FMT" time : %ld now : %ld difftime : %00.f\n", INT8_PRINT(*(seq_num *)&overdue_heartbeat->ack_seqno[0]), *(gtm_time4_t *)&overdue_heartbeat->ack_time[0], *now, time_elapsed); insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head); return (TRUE); #else return (FALSE); #endif } int gtmsource_send_heartbeat(time_t *now) { repl_heartbeat_que_entry_t *heartbeat_element; unsigned char *msg_ptr; /* needed for REPL_{SEND,RECV}_LOOP */ int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */ int status; /* needed for REPL_{SEND,RECV}_LOOP */ unsigned char seq_num_str[32], *seq_num_ptr; gtmsource_local_ptr_t gtmsource_local; error_def(ERR_REPLCOMM); error_def(ERR_TEXT); heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_free_head); if (NULL == heartbeat_element) /* Too many pending heartbeats, send later */ return (SS_NORMAL); QWASSIGN(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0], jnlpool.jnlpool_ctl->jnl_seqno); *(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0] = (gtm_time4_t)(*now); heartbeat_element->heartbeat.type = REPL_HEARTBEAT; heartbeat_element->heartbeat.len = MIN_REPL_MSGLEN; REPL_SEND_LOOP(gtmsource_sock_fd, &heartbeat_element->heartbeat, MIN_REPL_MSGLEN, FALSE, >msource_poll_immediate) { gtmsource_poll_actions(FALSE); /* Recursive call */ if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state || GTMSOURCE_CHANGING_MODE == gtmsource_state) return (SS_NORMAL); } if (SS_NORMAL == status) { insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head); last_sent_time = *now; if (0 == earliest_sent_time) earliest_sent_time = last_sent_time; REPL_DPRINT4("HEARTBEAT sent with time %ld SEQNO "INT8_FMT" at %ld\n", *(gtm_time4_t *)&heartbeat_element->heartbeat.ack_time[0], INT8_PRINT(*(seq_num *)&heartbeat_element->heartbeat.ack_seqno[0]), time(NULL)); return (SS_NORMAL); } if (EREPL_SEND == repl_errno && REPL_CONN_RESET(status)) { repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while attempting to send heartbeat. Status = %d ; %s\n", status, STRERROR(status)); repl_close(>msource_sock_fd); gtmsource_state = jnlpool.gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION; return (SS_NORMAL); } if (EREPL_SEND == repl_errno) rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_LITERAL("Error sending HEARTBEAT message. Error in send"), status); if (EREPL_SELECT == repl_errno) rts_error(VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2, RTS_ERROR_LITERAL("Error sending HEARTBEAT message. Error in select"), status); GTMASSERT; return -1; /* This will never get executed, added to make compiler happy */ } int gtmsource_process_heartbeat(repl_heartbeat_msg_ptr_t heartbeat_msg) { repl_heartbeat_que_entry_t *heartbeat_element; seq_num ack_seqno; gd_region *reg, *region_top; sgmnt_addrs *csa; unsigned char seq_num_str[32], *seq_num_ptr; QWASSIGN(ack_seqno, *(seq_num *)&heartbeat_msg->ack_seqno[0]); REPL_DPRINT4("HEARTBEAT received with time %ld SEQNO "INT8_FMT" at %ld\n", *(gtm_time4_t *)&heartbeat_msg->ack_time[0], INT8_PRINT(ack_seqno), time(NULL)); for (heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head); NULL != heartbeat_element&& *(gtm_time4_t *)&heartbeat_msg->ack_time[0] >= (gtm_time4_t)earliest_sent_time; heartbeat_element = (repl_heartbeat_que_entry_t *)remqh((que_ent_ptr_t)repl_heartbeat_que_head)) { insqt((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_free_head); earliest_sent_time = (time_t) *(gtm_time4_t *)&((repl_heartbeat_que_entry_t *) ((unsigned char *)repl_heartbeat_que_head + repl_heartbeat_que_head->que.fl))->heartbeat.ack_time[0]; } if (NULL != heartbeat_element) insqh((que_ent_ptr_t)heartbeat_element, (que_ent_ptr_t)repl_heartbeat_que_head); return (SS_NORMAL); }