462 lines
14 KiB
C
462 lines
14 KiB
C
/****************************************************************
|
|
* *
|
|
* Copyright 2001, 2011 Fidelity Information Services, Inc *
|
|
* *
|
|
* This source code contains the intellectual property *
|
|
* of its copyright holder(s), and is made available *
|
|
* under a license. If you do not know the terms of *
|
|
* the license, please stop and do not read further. *
|
|
* *
|
|
****************************************************************/
|
|
|
|
#include "mdef.h"
|
|
|
|
#include "gtm_string.h"
|
|
|
|
#ifdef UNIX
|
|
#include "gtm_stat.h"
|
|
#elif defined(VMS)
|
|
#include <fab.h>
|
|
#include <iodef.h>
|
|
#include <nam.h>
|
|
#include <rmsdef.h>
|
|
#include <ssdef.h>
|
|
#include <descrip.h> /* Required for gtmsource.h */
|
|
#include <efndef.h>
|
|
#else
|
|
#error Unsupported platform
|
|
#endif
|
|
#include <errno.h>
|
|
#include "gtm_fcntl.h"
|
|
#include "gtm_unistd.h"
|
|
#include "gtm_inet.h"
|
|
#include "gtm_stdio.h"
|
|
|
|
#include "gdsroot.h"
|
|
#include "gdsblk.h"
|
|
#include "gtm_facility.h"
|
|
#include "fileinfo.h"
|
|
#include "gdsbt.h"
|
|
#include "gdsfhead.h"
|
|
#include "filestruct.h"
|
|
#include "jnl.h"
|
|
#include "repl_msg.h"
|
|
#include "gtmsource.h"
|
|
#include "repl_ctl.h"
|
|
#include "repl_errno.h"
|
|
#include "repl_dbg.h"
|
|
#ifdef UNIX
|
|
#include "gtmio.h"
|
|
#endif
|
|
#include "iosp.h"
|
|
#include "eintr_wrappers.h"
|
|
#include "repl_sp.h"
|
|
#include "tp_change_reg.h"
|
|
#include "is_file_identical.h"
|
|
#include "get_fs_block_size.h"
|
|
#ifdef GTM_CRYPT
|
|
#include "gtmcrypt.h"
|
|
#endif
|
|
#ifdef __MVS__
|
|
#include "gtm_zos_io.h"
|
|
#endif
|
|
|
|
GBLDEF repl_ctl_element *repl_ctl_list = NULL;
|
|
|
|
GBLREF jnlpool_addrs jnlpool;
|
|
GBLREF seq_num seq_num_zero;
|
|
|
|
GBLREF gd_addr *gd_header;
|
|
GBLREF gd_region *gv_cur_region;
|
|
GBLREF sgmnt_addrs *cs_addrs;
|
|
|
|
GBLREF int gtmsource_log_fd;
|
|
GBLREF FILE *gtmsource_log_fp;
|
|
GBLREF int gtmsource_statslog_fd;
|
|
GBLREF FILE *gtmsource_statslog_fp;
|
|
|
|
repl_buff_t *repl_buff_create(uint4 buffsize, uint4 jnl_fs_block_size);
|
|
|
|
repl_buff_t *repl_buff_create(uint4 buffsize, uint4 jnl_fs_block_size)
|
|
{
|
|
repl_buff_t *tmp_rb;
|
|
int index;
|
|
unsigned char *buff_ptr;
|
|
|
|
tmp_rb = (repl_buff_t *)malloc(SIZEOF(repl_buff_t));
|
|
tmp_rb->buffindex = REPL_MAINBUFF;
|
|
for (index = REPL_MAINBUFF; REPL_NUMBUFF > index; index++)
|
|
{
|
|
tmp_rb->buff[index].reclen = 0;
|
|
tmp_rb->buff[index].recaddr = JNL_FILE_FIRST_RECORD;
|
|
tmp_rb->buff[index].readaddr = JNL_FILE_FIRST_RECORD;
|
|
tmp_rb->buff[index].buffremaining = buffsize;
|
|
buff_ptr = (unsigned char *)malloc(buffsize + jnl_fs_block_size);
|
|
tmp_rb->buff[index].base_buff = buff_ptr;
|
|
tmp_rb->buff[index].base = (unsigned char *)ROUND_UP2((uintszofptr_t)buff_ptr, jnl_fs_block_size);
|
|
tmp_rb->buff[index].recbuff = tmp_rb->buff[index].base;
|
|
}
|
|
tmp_rb->fc = (repl_file_control_t *)malloc(SIZEOF(repl_file_control_t));
|
|
return (tmp_rb);
|
|
}
|
|
|
|
/* Given a journal file name, this function opens that file explicitly without going through "jnl_ensure_open" */
|
|
int repl_open_jnl_file_by_name(repl_ctl_element *tmp_ctl, int jnl_fn_len, char *jnl_fn, int *fd_ptr, void *stat_buf_ptr)
|
|
{
|
|
int tmp_fd;
|
|
int status;
|
|
#ifdef UNIX
|
|
struct stat stat_buf;
|
|
#elif defined(VMS)
|
|
struct FAB fab;
|
|
struct NAM stat_buf;
|
|
#else
|
|
#error Unsupported platform
|
|
#endif
|
|
|
|
tmp_ctl->jnl_fn_len = jnl_fn_len;
|
|
memcpy(tmp_ctl->jnl_fn, jnl_fn, jnl_fn_len);
|
|
tmp_ctl->jnl_fn[jnl_fn_len] = '\0';
|
|
status = SS_NORMAL;
|
|
|
|
/* Open Journal File */
|
|
# ifdef UNIX
|
|
OPENFILE(tmp_ctl->jnl_fn, O_RDONLY, tmp_fd);
|
|
if (0 > tmp_fd)
|
|
{
|
|
status = errno;
|
|
assert(FALSE);
|
|
}
|
|
if (SS_NORMAL == status)
|
|
{
|
|
FSTAT_FILE(tmp_fd, &stat_buf, status);
|
|
if (0 > status)
|
|
{
|
|
status = errno;
|
|
assert(FALSE);
|
|
}
|
|
# ifdef __MVS
|
|
else if (-1 == gtm_zos_tag_to_policy(tmp_fd, TAG_BINARY))
|
|
{
|
|
status = errno;
|
|
assert(FALSE);
|
|
}
|
|
# endif
|
|
}
|
|
*((struct stat *)stat_buf_ptr) = stat_buf;
|
|
# elif defined(VMS)
|
|
fab = cc$rms_fab;
|
|
fab.fab$l_fna = tmp_ctl->jnl_fn;
|
|
fab.fab$b_fns = tmp_ctl->jnl_fn_len;
|
|
fab.fab$l_fop = FAB$M_UFO;
|
|
fab.fab$b_fac = FAB$M_GET | FAB$M_PUT | FAB$M_BIO;
|
|
fab.fab$b_shr = FAB$M_SHRPUT | FAB$M_SHRGET | FAB$M_UPI;
|
|
stat_buf = cc$rms_nam;
|
|
fab.fab$l_nam = &stat_buf;
|
|
fab.fab$l_dna = JNL_EXT_DEF;
|
|
fab.fab$b_dns = SIZEOF(JNL_EXT_DEF) - 1;
|
|
status = sys$open(&fab);
|
|
if (RMS$_NORMAL == status)
|
|
{
|
|
status = SS_NORMAL;
|
|
tmp_fd = fab.fab$l_stv;
|
|
}
|
|
assert(SS_NORMAL == status);
|
|
*((struct NAM *)stat_buf_ptr) = stat_buf;
|
|
# endif
|
|
REPL_DPRINT2("CTL INIT : Direct open of file %s\n", tmp_ctl->jnl_fn);
|
|
*fd_ptr = tmp_fd;
|
|
return status;
|
|
}
|
|
|
|
int repl_ctl_create(repl_ctl_element **ctl, gd_region *reg, int jnl_fn_len, char *jnl_fn, boolean_t init)
|
|
{
|
|
gd_region *r_save;
|
|
sgmnt_addrs *csa;
|
|
sgmnt_data_ptr_t csd;
|
|
repl_ctl_element *tmp_ctl = NULL;
|
|
jnl_file_header *tmp_jfh = NULL;
|
|
jnl_file_header *tmp_jfh_base = NULL;
|
|
jnl_private_control *jpc;
|
|
int tmp_fd = NOJNL;
|
|
int status;
|
|
GTMCRYPT_ONLY(
|
|
int crypt_status;
|
|
)
|
|
uint4 jnl_status;
|
|
boolean_t did_jnl_ensure_open = FALSE, was_crit;
|
|
int4 lcl_jnl_fn_len;
|
|
char lcl_jnl_fn[JNL_NAME_SIZE];
|
|
#ifdef UNIX
|
|
struct stat stat_buf;
|
|
#elif defined(VMS)
|
|
struct NAM stat_buf;
|
|
short iosb[4]; /* needed by the F_READ_BLK_ALIGNED macro */
|
|
#else
|
|
#error Unsupported platform
|
|
#endif
|
|
uint4 jnl_fs_block_size;
|
|
|
|
error_def(ERR_JNLFILOPN);
|
|
|
|
status = SS_NORMAL;
|
|
jnl_status = 0;
|
|
|
|
tmp_ctl = (repl_ctl_element *)malloc(SIZEOF(repl_ctl_element));
|
|
tmp_ctl->reg = reg;
|
|
csa = &FILE_INFO(reg)->s_addrs;
|
|
jpc = csa->jnl;
|
|
if (init)
|
|
{
|
|
assert((0 == jnl_fn_len) && ((NULL == jnl_fn) || ('\0' == jnl_fn[0])));
|
|
r_save = gv_cur_region;
|
|
gv_cur_region = reg;
|
|
tp_change_reg();
|
|
assert(csa == cs_addrs);
|
|
jpc->channel = NOJNL; /* Not to close the prev gener file */
|
|
was_crit = csa->now_crit;
|
|
if (!was_crit)
|
|
grab_crit(reg);
|
|
/* Although replication may be WAS_ON, it is possible that source server has not yet sent records
|
|
* that were generated when replication was ON. We have to open and read this journal file to
|
|
* cover such a case. But in the WAS_ON case, do not ask for a jnl_ensure_open to be done since
|
|
* it will return an error (it will try to open the latest generation journal file and that will
|
|
* fail because of a lot of reasons e.g. "jpc->cycle" vs "jpc->jnl_buff->cycle" mismatch or db/jnl
|
|
* tn mismatch JNLTRANSLSS error etc.). This will even cause a journal file switch which the source
|
|
* server should never do (it is only supposed to READ from journal files). Open the journal file
|
|
* stored in the database file header (which will be non-NULL even though journaling is currently OFF)
|
|
* thereby can send as many seqnos as possible until the repl=WAS_ON/jnl=OFF state was reached.
|
|
*/
|
|
csd = csa->hdr;
|
|
assert(REPL_ALLOWED(csd));
|
|
if (!REPL_WAS_ENABLED(csd))
|
|
{
|
|
/* replication is allowed and has not gone into the WAS_ON state so journaling is expected to be ON*/
|
|
assert(JNL_ENABLED(csd));
|
|
did_jnl_ensure_open = TRUE;
|
|
jnl_status = jnl_ensure_open();
|
|
if (0 != jnl_status)
|
|
{
|
|
if (!was_crit)
|
|
rel_crit(reg);
|
|
if (SS_NORMAL != jpc->status)
|
|
rts_error(VARLSTCNT(7) jnl_status, 4, JNL_LEN_STR(csd), DB_LEN_STR(reg), jpc->status);
|
|
else
|
|
rts_error(VARLSTCNT(6) jnl_status, 4, JNL_LEN_STR(csd), DB_LEN_STR(reg));
|
|
} else
|
|
{
|
|
tmp_ctl->jnl_fn_len = csd->jnl_file_len;
|
|
memcpy(tmp_ctl->jnl_fn, csd->jnl_file_name, tmp_ctl->jnl_fn_len);
|
|
tmp_ctl->jnl_fn[tmp_ctl->jnl_fn_len] = '\0';
|
|
}
|
|
/* stash the shared fileid into private storage before rel_crit as it is used in JNL_GDID_PVT macro below */
|
|
VMS_ONLY (jpc->fileid = csa->nl->jnl_file.jnl_file_id;)
|
|
UNIX_ONLY(jpc->fileid = csa->nl->jnl_file.u;)
|
|
REPL_DPRINT2("CTL INIT : Open of file %s thru jnl_ensure_open\n", tmp_ctl->jnl_fn);
|
|
tmp_fd = jpc->channel;
|
|
} else
|
|
{ /* Note that we hold crit so it is safe to pass csd->jnl_file_name (no one else will be changing it) */
|
|
status = repl_open_jnl_file_by_name(tmp_ctl, csd->jnl_file_len, (char *)csd->jnl_file_name,
|
|
&tmp_fd, &stat_buf);
|
|
}
|
|
if (!was_crit)
|
|
rel_crit(reg);
|
|
gv_cur_region = r_save;
|
|
tp_change_reg();
|
|
assert(NOJNL != tmp_fd);
|
|
} else
|
|
status = repl_open_jnl_file_by_name(tmp_ctl, jnl_fn_len, jnl_fn, &tmp_fd, &stat_buf);
|
|
if (status == SS_NORMAL)
|
|
{
|
|
jnl_fs_block_size = get_fs_block_size(tmp_fd);
|
|
/* Because the read below will be to the aligned buffer, and it will read an aligned size, we need
|
|
* to allocate jnl_file_header + 2 * jnl_fs_block_size. There will be some throw-away before the
|
|
* buffer to get the alignment in place, and then up to jnl_fs_block_size after the header in order
|
|
* for the size to be a multiple of jnl_fs_block_size.
|
|
*/
|
|
tmp_jfh_base = (jnl_file_header *)malloc(SIZEOF(jnl_file_header) + (2 * jnl_fs_block_size));
|
|
tmp_jfh = (jnl_file_header *)(ROUND_UP2((uintszofptr_t)tmp_jfh_base, jnl_fs_block_size));
|
|
F_READ_BLK_ALIGNED(tmp_fd, 0, tmp_jfh, ROUND_UP2(REAL_JNL_HDR_LEN, jnl_fs_block_size), status);
|
|
assert(SS_NORMAL == status);
|
|
if (SS_NORMAL == status)
|
|
CHECK_JNL_FILE_IS_USABLE(tmp_jfh, status, FALSE, 0, NULL); /* FALSE => NO gtm_putmsg even if errors */
|
|
assert(SS_NORMAL == status);
|
|
}
|
|
if (SS_NORMAL != status)
|
|
{
|
|
/* We need tmp_ctl->jnl_fn to issue the error but want to free up tmp_ctl before the error.
|
|
* So copy jnl_fn into local buffer before the error.
|
|
*/
|
|
lcl_jnl_fn_len = tmp_ctl->jnl_fn_len;
|
|
assert(ARRAYSIZE(lcl_jnl_fn) >= ARRAYSIZE(tmp_ctl->jnl_fn));
|
|
assert(lcl_jnl_fn_len < ARRAYSIZE(lcl_jnl_fn));
|
|
memcpy(lcl_jnl_fn, tmp_ctl->jnl_fn, lcl_jnl_fn_len);
|
|
lcl_jnl_fn[lcl_jnl_fn_len] = '\0';
|
|
assert(FALSE);
|
|
free(tmp_ctl);
|
|
tmp_ctl = NULL;
|
|
if (NULL != tmp_jfh_base)
|
|
free(tmp_jfh_base);
|
|
tmp_jfh = NULL;
|
|
tmp_jfh_base = NULL;
|
|
rts_error(VARLSTCNT(7) ERR_JNLFILOPN, 4, lcl_jnl_fn_len, lcl_jnl_fn, DB_LEN_STR(reg), status);
|
|
}
|
|
assert(SS_NORMAL == status); /* so jnl_fs_block_size is guaranteed to have been initialized */
|
|
tmp_ctl->repl_buff = repl_buff_create(tmp_jfh->alignsize, jnl_fs_block_size);
|
|
tmp_ctl->repl_buff->backctl = tmp_ctl;
|
|
tmp_ctl->repl_buff->fc->eof_addr = JNL_FILE_FIRST_RECORD;
|
|
tmp_ctl->repl_buff->fc->fs_block_size = jnl_fs_block_size;
|
|
tmp_ctl->repl_buff->fc->jfh_base = tmp_jfh_base;
|
|
tmp_ctl->repl_buff->fc->jfh = tmp_jfh;
|
|
tmp_ctl->repl_buff->fc->fd = tmp_fd;
|
|
# ifdef GTM_CRYPT
|
|
if (tmp_jfh->is_encrypted)
|
|
{
|
|
INIT_PROC_ENCRYPTION(crypt_status);
|
|
/* If the encryption init failed in db_init, the below MACRO should return an error.
|
|
* Depending on the error returned, report the error.*/
|
|
GTMCRYPT_GETKEY(tmp_jfh->encryption_hash, tmp_ctl->encr_key_handle, crypt_status);
|
|
if (0 != crypt_status)
|
|
GC_RTS_ERROR(crypt_status, tmp_ctl->jnl_fn);
|
|
}
|
|
# endif
|
|
if (did_jnl_ensure_open)
|
|
{
|
|
F_COPY_GDID(tmp_ctl->repl_buff->fc->id, JNL_GDID_PVT(csa));
|
|
/* reset jpc->channel (would have been updated by jnl_ensure_open) as that corresponds to an
|
|
* actively updated journal file and is only for GT.M and never for source server which only
|
|
* READS from journal files. Source server anyways has a copy of the fd in tmp_ctl->repl_buff->fc->fd.
|
|
*/
|
|
jpc->channel = NOJNL;
|
|
} else
|
|
{
|
|
F_COPY_GDID_FROM_STAT(tmp_ctl->repl_buff->fc->id, stat_buf); /* For VMS stat_buf is a NAM structure */
|
|
}
|
|
|
|
QWASSIGN(tmp_ctl->min_seqno, seq_num_zero);
|
|
QWASSIGN(tmp_ctl->max_seqno, seq_num_zero);
|
|
QWASSIGN(tmp_ctl->seqno, seq_num_zero);
|
|
tmp_ctl->tn = 1;
|
|
tmp_ctl->file_state = JNL_FILE_UNREAD;
|
|
tmp_ctl->lookback = FALSE;
|
|
tmp_ctl->first_read_done = FALSE;
|
|
tmp_ctl->eof_addr_final = FALSE;
|
|
tmp_ctl->max_seqno_final = FALSE;
|
|
tmp_ctl->read_complete = FALSE;
|
|
tmp_ctl->min_seqno_dskaddr = 0;
|
|
tmp_ctl->max_seqno_dskaddr = 0;
|
|
tmp_ctl->next = tmp_ctl->prev = NULL;
|
|
*ctl = tmp_ctl;
|
|
|
|
return (SS_NORMAL);
|
|
}
|
|
|
|
int gtmsource_ctl_init(void)
|
|
{
|
|
/* Setup ctl for reading from journal files */
|
|
|
|
gd_region *region_top, *reg;
|
|
sgmnt_addrs *csa;
|
|
sgmnt_data_ptr_t csd;
|
|
repl_ctl_element *tmp_ctl, *prev_ctl;
|
|
int jnl_file_len, status;
|
|
|
|
repl_ctl_list = (repl_ctl_element *)malloc(SIZEOF(repl_ctl_element));
|
|
memset((char_ptr_t)repl_ctl_list, 0, SIZEOF(*repl_ctl_list));
|
|
prev_ctl = repl_ctl_list;
|
|
|
|
region_top = gd_header->regions + gd_header->n_regions;
|
|
for (reg = gd_header->regions; reg < region_top; reg++)
|
|
{
|
|
assert(reg->open);
|
|
csa = &FILE_INFO(reg)->s_addrs;
|
|
csd = csa->hdr;
|
|
if (REPL_ALLOWED(csd))
|
|
{
|
|
status = repl_ctl_create(&tmp_ctl, reg, 0, NULL, TRUE);
|
|
assert(SS_NORMAL == status);
|
|
prev_ctl->next = tmp_ctl;
|
|
tmp_ctl->prev = prev_ctl;
|
|
tmp_ctl->next = NULL;
|
|
prev_ctl = tmp_ctl;
|
|
}
|
|
}
|
|
if (NULL == repl_ctl_list->next) /* No replicated region */
|
|
GTMASSERT;
|
|
return (SS_NORMAL);
|
|
}
|
|
|
|
int repl_ctl_close(repl_ctl_element *ctl)
|
|
{
|
|
int index;
|
|
int status;
|
|
|
|
if (NULL != ctl)
|
|
{
|
|
if (NULL != ctl->repl_buff)
|
|
{
|
|
for (index = REPL_MAINBUFF; REPL_NUMBUFF > index; index++)
|
|
if (NULL != ctl->repl_buff->buff[index].base_buff)
|
|
free(ctl->repl_buff->buff[index].base_buff);
|
|
if (NULL != ctl->repl_buff->fc)
|
|
{
|
|
if (NULL != ctl->repl_buff->fc->jfh_base)
|
|
free(ctl->repl_buff->fc->jfh_base);
|
|
if (NOJNL != ctl->repl_buff->fc->fd)
|
|
F_CLOSE(ctl->repl_buff->fc->fd, status); /* resets "ctl->repl_buff->fc->fd" to FD_INVALID */
|
|
free(ctl->repl_buff->fc);
|
|
}
|
|
free(ctl->repl_buff);
|
|
}
|
|
free(ctl);
|
|
}
|
|
return (SS_NORMAL);
|
|
}
|
|
|
|
int gtmsource_ctl_close(void)
|
|
{
|
|
repl_ctl_element *ctl;
|
|
sgmnt_addrs *csa;
|
|
int status;
|
|
|
|
if (repl_ctl_list)
|
|
{
|
|
for (ctl = repl_ctl_list->next; NULL != ctl; ctl = repl_ctl_list->next)
|
|
{
|
|
repl_ctl_list->next = ctl->next; /* next element becomes head thereby removing this element,
|
|
the current head from the list; if there is an error path that returns
|
|
us to this function before all elements were freed, we won't try to
|
|
free elements that have been freed already */
|
|
DEBUG_ONLY(
|
|
csa = &FILE_INFO(ctl->reg)->s_addrs;
|
|
/* jpc->channel should never be set to a valid value outside of repl_ctl_create */
|
|
assert((NULL != csa->jnl) && (NOJNL == csa->jnl->channel));
|
|
)
|
|
repl_ctl_close(ctl);
|
|
}
|
|
ctl = repl_ctl_list;
|
|
repl_ctl_list = NULL;
|
|
free(ctl);
|
|
}
|
|
return (SS_NORMAL);
|
|
}
|
|
|
|
int gtmsource_set_lookback(void)
|
|
{
|
|
/* Scan all the region ctl's and set lookback to TRUE if ctl has to be
|
|
* repositioned for a transaction read from the past */
|
|
|
|
repl_ctl_element *ctl;
|
|
|
|
for (ctl = repl_ctl_list->next; NULL != ctl; ctl = ctl->next)
|
|
{
|
|
if ((JNL_FILE_OPEN == ctl->file_state ||
|
|
JNL_FILE_CLOSED == ctl->file_state) &&
|
|
QWLE(jnlpool.gtmsource_local->read_jnl_seqno, ctl->seqno))
|
|
ctl->lookback = TRUE;
|
|
else
|
|
ctl->lookback = FALSE;
|
|
}
|
|
return (SS_NORMAL);
|
|
}
|