fis-gtm/sr_port/updproc.c

1119 lines
45 KiB
C

/****************************************************************
* *
* Copyright 2001, 2011 Fidelity Information Services, Inc *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#include "mdef.h"
#include "gtm_stat.h"
#include "gtm_stdlib.h"
#include "gtm_fcntl.h"
#include "gtm_time.h"
#include "gtm_unistd.h"
#include "gtm_stdio.h"
#include "gtm_inet.h"
#include <sys/mman.h>
#include <errno.h>
#ifdef VMS
#include <ssdef.h>
#include <descrip.h> /* Required for gtmsource.h */
#endif
#include "cdb_sc.h"
#include "gtm_string.h"
#include "gdsroot.h"
#include "gdskill.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsblk.h"
#include "gdsfhead.h"
#include "gdscc.h"
#include "copy.h"
#include "filestruct.h"
#include "jnl.h"
#include "buddy_list.h" /* needed for tp.h */
#include "hashtab_int4.h" /* needed for muprec.h and tp.h */
#include "hashtab_int8.h" /* needed for muprec.h */
#include "hashtab_mname.h" /* needed for muprec.h */
#include "muprec.h"
#include "tp.h"
#include "iosp.h"
#include "gtmrecv.h"
#include "cli.h"
#include "error.h"
#include "repl_dbg.h"
#include "repl_msg.h"
#include "gtmsource.h"
#include "repl_shutdcode.h"
#include "repl_sp.h"
#include "jnl_write.h"
#ifdef UNIX
#include "repl_instance.h"
#include "gtmio.h"
#endif
#ifdef GTM_TRIGGER
#include "rtnhdr.h" /* for rtn_tabent in gv_trigger.h */
#include "gv_trigger.h"
#include "targ_alloc.h"
#endif
#ifdef VMS
#include <fab.h>
#include <rms.h>
#include <iodef.h>
#include <secdef.h>
#include <psldef.h>
#include <lckdef.h>
#include <syidef.h>
#include <xab.h>
#include <prtdef.h>
#endif
#include "op.h"
#include "svnames.h" /* for SV_ZTWORMHOLE */
#include "gvcst_protos.h" /* for gvcst_init prototype */
#include "read_db_files_from_gld.h"
#include "updproc.h"
#include "tp_change_reg.h"
#include "wcs_flu.h"
#include "repl_log.h"
#include "tp_restart.h"
#include "gtmmsg.h" /* for gtm_putmsg() prototype */
#include "mu_gv_stack_init.h"
#include "jnl_typedef.h"
#include "memcoherency.h"
#include "aswp.h"
#include "jnl_get_checksum.h"
#include "updproc_get_gblname.h"
#include "wcs_recover.h"
#include "have_crit.h"
#include "wbox_test_init.h"
#include "format_targ_key.h"
#include "op_tcommit.h"
#include "error_trap.h"
#include "tp_frame.h"
#define MAX_IDLE_HARD_SPINS 1000 /* Fail-safe count to avoid hanging CPU in tight loop while it's idle */
GBLREF uint4 dollar_tlevel;
#ifdef DEBUG
GBLREF uint4 dollar_trestart;
GBLREF boolean_t donot_INVOKE_MUMTSTART;
#endif
GBLREF gv_key *gv_currkey;
GBLREF gd_region *gv_cur_region;
GBLREF sgmnt_addrs *cs_addrs;
GBLREF sgmnt_data_ptr_t cs_data;
GBLREF recvpool_addrs recvpool;
GBLREF jnlpool_addrs jnlpool;
GBLREF jnlpool_ctl_ptr_t jnlpool_ctl, temp_jnlpool_ctl;
GBLREF boolean_t is_updproc;
GBLREF seq_num seq_num_zero, seq_num_one;
GBLREF gd_addr *gd_header;
GBLREF boolean_t repl_allowed;
GBLREF FILE *updproc_log_fp;
GBLREF void (*call_on_signal)();
GBLREF sgm_info *first_sgm_info;
GBLREF unsigned int t_tries;
GBLREF unsigned char t_fail_hist[CDB_MAX_TRIES];
GBLREF struct_jrec_tcom tcom_record;
GBLREF gv_namehead *reset_gv_target;
#ifdef VMS
GBLREF struct chf$signal_array *tp_restart_fail_sig;
GBLREF boolean_t tp_restart_fail_sig_used;
#endif
GBLREF boolean_t is_replicator;
GBLREF jnl_gbls_t jgbl;
GBLREF boolean_t gvdupsetnoop; /* if TRUE, duplicate SETs update journal but not database (except for curr_tn++) */
GBLREF boolean_t secondary_side_std_null_coll;
GBLREF boolean_t disk_blk_read;
GBLREF seq_num lastlog_seqno;
GBLREF uint4 log_interval;
#ifdef GTM_TRIGGER
DEBUG_ONLY(GBLREF ch_ret_type (*ch_at_trigger_init)();)
GBLREF int tprestart_state; /* When triggers restart, multiple states possible. See tp_restart.h */
GBLREF dollar_ecode_type dollar_ecode; /* structure containing $ECODE related information */
GBLREF mval dollar_ztwormhole;
#endif
GBLREF boolean_t skip_dbtriggers;
GBLREF gv_namehead *gv_target;
LITREF mval literal_hasht;
static boolean_t updproc_continue = TRUE;
error_def(ERR_GBLOFLOW);
error_def(ERR_GVIS);
error_def(ERR_REC2BIG);
error_def(ERR_RECVPOOLSETUP);
error_def(ERR_REPEATERROR);
error_def(ERR_SECONDAHEAD);
error_def(ERR_TEXT);
error_def(ERR_TPRETRY);
error_def(ERR_TPRETRY);
error_def(ERR_TRIGDEFNOSYNC);
error_def(ERR_UPDREPLSTATEOFF);
CONDITION_HANDLER(updproc_ch)
{
int rc;
unsigned char seq_num_str[32], *seq_num_ptr;
unsigned char seq_num_strx[32], *seq_num_ptrx;
START_CH;
if ((int)ERR_TPRETRY == SIGNAL)
{
# if defined(DEBUG) && defined(DEBUG_UPDPROC_TPRETRY)
assert(FALSE);
# endif
repl_log(updproc_log_fp, TRUE, TRUE, " ----> TPRETRY for sequence number "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(recvpool.upd_proc_local->read_jnl_seqno),
INT8_PRINTX(recvpool.upd_proc_local->read_jnl_seqno));
/* This is a kludge. We can come here from 2 places.
* ( i) From a call to t_retry which does a rts_error(ERR_TPRETRY).
* (ii) From updproc_actions() where immediately after op_tcommit we detect that dollar_tlevel is non-zero.
* In the first case, we need to do a tp_restart. In the second, op_tcommit would have already done it for us.
* The way we detect the second case is from the value of first_sgm_info since it is NULLified in tp_restart.
*/
if (first_sgm_info GTMTRIG_ONLY( || (TPRESTART_STATE_NORMAL != tprestart_state)))
{
VMS_ONLY(assert(FALSE == tp_restart_fail_sig_used);)
rc = tp_restart(1, TP_RESTART_HANDLES_ERRORS);
assert(0 == rc); /* No partials restarts can happen at this final level */
GTMTRIG_ONLY(assert(TPRESTART_STATE_NORMAL == tprestart_state));
NON_GTMTRIG_ONLY(assert(INVALID_GV_TARGET == reset_gv_target);)
reset_gv_target = INVALID_GV_TARGET; /* see "trigger_item_tpwrap_ch" similar code for why this is needed */
# ifdef UNIX
if (ERR_TPRETRY == SIGNAL) /* (signal value undisturbed) */
# elif defined VMS
if (!tp_restart_fail_sig_used) /* If tp_restart ran clean */
# else
# error unsupported platform
# endif
{
UNWIND(NULL, NULL);
}
# ifdef VMS
else
{ /* Otherwise tp_restart had a signal that we must now deal with.
* replace the TPRETRY information with that saved from tp_restart.
* first assert that we have room for these arguments and proper setup
*/
assert(TPRESTART_ARG_CNT >= tp_restart_fail_sig->chf$is_sig_args);
memcpy(sig, tp_restart_fail_sig, (tp_restart_fail_sig->chf$l_sig_args + 1) * SIZEOF(int));
tp_restart_fail_sig_used = FALSE;
}
# endif
} else
{
UNWIND(NULL, NULL);
}
}
# ifdef GTM_TRIGGER
else if (ERR_REPEATERROR == SIGNAL)
SIGNAL = dollar_ecode.error_last_ecode; /* Error rethrown from a trigger */
# endif
NEXTCH;
}
/* updproc performs its main processing in a function call invoked within a loop.
* Unless there is a TPRESTART, the processing remains in the updproc_actions loop,
* but when a resource conflict triggers a TPRESTART, the condition handler drops
* back to the outer loop in updproc, which reissues the call.
*/
int updproc(void)
{
seq_num jnl_seqno; /* the current jnl_seq no of the Update process */
seq_num start_jnl_seqno;
uint4 status;
gld_dbname_list *gld_db_files;
# ifdef VMS
char proc_name[PROC_NAME_MAXLEN + 1];
struct dsc$descriptor_s proc_name_desc;
# endif
call_on_signal = updproc_sigstop;
GTMTRIG_DBG_ONLY(ch_at_trigger_init = &updproc_ch);
# ifdef VMS
/* Get a meaningful process name */
proc_name_desc.dsc$w_length = get_proc_name(LIT_AND_LEN("GTMUPD"), getpid(), proc_name);
proc_name_desc.dsc$a_pointer = proc_name;
proc_name_desc.dsc$b_dtype = DSC$K_DTYPE_T;
proc_name_desc.dsc$b_class = DSC$K_CLASS_S;
if (SS$_NORMAL != (status = sys$setprn(&proc_name_desc)))
rts_error(VARLSTCNT(7) ERR_RECVPOOLSETUP, 0, ERR_TEXT, 2,
RTS_ERROR_LITERAL("Unable to change update process name"), status);
# endif
is_updproc = TRUE;
is_replicator = TRUE; /* as update process goes through t_end() and can write jnl recs to the jnlpool for replicated db */
NON_GTMTRIG_ONLY(skip_dbtriggers = TRUE;)
gvdupsetnoop = FALSE; /* disable optimization to avoid multiple updates to the database and journal for duplicate sets */
/* if duplicate SETs cause multiple updates to the database and journal in the primary, we want to do the same
* here in order to maintain the jnl_seqno in sync. note that the primary can run the VIEW "GVDUPSETNOOP" command
* to enable/disable this feature on a per-process basis so it is not under our control. if the primary does run with
* this feature enabled, then we will not be receiving journal records for the duplicate set anyways so running with
* this flag always set to FALSE does not hurt.
*/
memset((uchar_ptr_t)&recvpool, 0, SIZEOF(recvpool)); /* For util_base_ch and mupip_exit */
if (updproc_init(&gld_db_files, &start_jnl_seqno) == UPDPROC_EXISTS) /* we got the global directory header already */
rts_error(VARLSTCNT(6) ERR_RECVPOOLSETUP, 0, ERR_TEXT, 2, RTS_ERROR_LITERAL("Update Process already exists"));
/* Initialization of all the relevant global datastructures and allocation for TP */
mu_gv_stack_init();
recvpool.upd_proc_local->read = 0;
recvpool.recvpool_ctl->std_null_coll = secondary_side_std_null_coll;
jnl_seqno = start_jnl_seqno;
UNIX_ONLY(
recvpool.recvpool_ctl->max_dualsite_resync_seqno = jgbl.max_dualsite_resync_seqno;
)
recvpool.recvpool_ctl->jnl_seqno = jnl_seqno;
recvpool.upd_proc_local->read_jnl_seqno = jnl_seqno;
if (repl_allowed)
{ /* Check if the secondary is ahead of the primary */
VMS_ONLY(
if (jnlpool_ctl->jnl_seqno > start_jnl_seqno && jnlpool_ctl->upd_disabled)
{
repl_log(updproc_log_fp, TRUE, TRUE,
"JNLSEQNO last updated by update process = "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(start_jnl_seqno), INT8_PRINTX(start_jnl_seqno));
repl_log(updproc_log_fp, TRUE, TRUE,
"JNLSEQNO of last transaction written to journal pool = "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(jnlpool_ctl->jnl_seqno), INT8_PRINTX(jnlpool_ctl->jnl_seqno));
rts_error(VARLSTCNT(1) ERR_SECONDAHEAD);
}
)
UNIX_ONLY(
/* The SECONDAHEAD check is performed in the receiver server after it has connected with the source
* server. This is because the check is relevant only if the source server is dualsite. That is
* not known now but instead at connection time. Hence the deferred check.
*/
assert(jnlpool_ctl->jnl_seqno == start_jnl_seqno);
)
}
while (updproc_continue)
updproc_actions(gld_db_files);
updproc_end();
return(SS_NORMAL);
}
void updproc_actions(gld_dbname_list *gld_db_files)
{
mval ts_mv, val_mv;
jnl_record *rec;
uint4 temp_write, temp_read;
enum jnl_record_type rectype;
int4 upd_rec_seqno = 0; /* the total no of journal records excluding TCOM records */
int4 tupd_num; /* the number of tset/tkill/tzkill records encountered */
int4 tcom_num; /* the number of tcom records encountered */
seq_num jnl_seqno, tmpseqno; /* the current jnl_seq no of the Update process */
seq_num last_errored_seqno = 0;
int key_len, rec_len, backptr;
char fn[MAX_FN_LEN];
sm_uc_ptr_t readaddrs; /* start of current rec in pool */
struct_jrec_null null_record;
jnldata_hdr_ptr_t jnl_header;
boolean_t incr_seqno;
seq_num temp_df_seqnum;
uint4 jnl_status = 0;
char *val_ptr;
jnl_string *keystr;
mstr mname;
char *key, *keytop;
enum upd_bad_trans_type bad_trans_type;
recvpool_ctl_ptr_t recvpool_ctl;
upd_proc_local_ptr_t upd_proc_local;
gtmrecv_local_ptr_t gtmrecv_local;
upd_helper_ctl_ptr_t upd_helper_ctl;
sgmnt_addrs *csa;
sgmnt_data_ptr_t csd;
char gv_mname[MAX_KEY_SZ];
unsigned char buff[MAX_ZWR_KEY_SZ], *end, scan_char, next_char;
boolean_t log_switched = FALSE;
static seq_num seqnum_diff = 0;
jnl_private_control *jpc;
jnl_buffer_ptr_t jbp;
gld_dbname_list *curr;
gd_region *save_reg;
int4 wtstart_errno;
boolean_t buffers_flushed;
uint4 idle_flush_count = 0; /* Number of times buffers were flushed without an intermediate sleep */
uint4 write_wrap, cntr, last_nullsubs, last_subs, keyend;
UNIX_ONLY(
repl_triple triple;
repl_triple_jnl_ptr_t triplecontent;
)
# ifdef GTM_TRIGGER
uint4 nodeflags;
boolean_t primary_has_trigdef, secondary_has_trigdef;
const char *trigdef_inst = NULL, *no_trigdef_inst = NULL;
# endif
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
ESTABLISH(updproc_ch);
recvpool_ctl = recvpool.recvpool_ctl;
upd_proc_local = recvpool.upd_proc_local;
gtmrecv_local = recvpool.gtmrecv_local;
upd_helper_ctl = recvpool.upd_helper_ctl;
temp_read = upd_proc_local->read;
temp_write = recvpool_ctl->write;
readaddrs = recvpool.recvdata_base + temp_read;
upd_rec_seqno = tupd_num = tcom_num = 0;
jnl_seqno = upd_proc_local->read_jnl_seqno;
log_interval = upd_proc_local->log_interval;
lastlog_seqno = jnl_seqno - log_interval; /* to ensure we log the first transaction */
while (TRUE)
{
incr_seqno = FALSE;
if (repl_allowed)
{ /* An UPDREPLSTATEOFF error is issued BEFORE applying an update to a non-replicated database.
* But it is possible that replication was turned ON at that time (since we dont hold crit
* at the time we do the UPDREPLSTATEOFF check) but later got turned OFF (for example due to
* no disk space for journal files etc.) just before the actual application of the update.
* In that case, the UPDREPLSTATEOFF error would not have been issued but there is still an
* issue in that an update from the primary got applied to a non-replicated database on the
* secondary. This will be caught by the below test which checks that the journal seqno of
* the secondary AFTER applying each incoming transaction matches the journal seqno of the
* next incoming transaction.
*/
temp_df_seqnum = upd_proc_local->read_jnl_seqno - jnlpool_ctl->jnl_seqno;
if ((0 != temp_df_seqnum) && (seqnum_diff != temp_df_seqnum))
{
seqnum_diff = temp_df_seqnum;
repl_log(updproc_log_fp, TRUE, TRUE,
"JNLSEQNO last updated by update process = "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(upd_proc_local->read_jnl_seqno),
INT8_PRINTX(upd_proc_local->read_jnl_seqno));
repl_log(updproc_log_fp, TRUE, TRUE,
"JNLSEQNO of last transaction written to journal pool = "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(jnlpool_ctl->jnl_seqno), INT8_PRINTX(jnlpool_ctl->jnl_seqno));
repl_log(updproc_log_fp, TRUE, TRUE, "Number of transactions from Primary which did NOT update "
"JNLSEQNO on Secondary is "INT8_FMT" "INT8_FMTX" \n",
INT8_PRINT(temp_df_seqnum), INT8_PRINTX(temp_df_seqnum));
}
}
if (SHUTDOWN == upd_proc_local->upd_proc_shutdown)
break;
if (GTMRECV_NO_RESTART != gtmrecv_local->restart)
{
/* wait for restart to become GTMRECV_NO_RESTART (set by the Receiver Server) */
if (GTMRECV_RCVR_RESTARTED == gtmrecv_local->restart)
{
recvpool.recvpool_ctl->jnl_seqno = jnl_seqno;
readaddrs = recvpool.recvdata_base;
upd_proc_local->read = 0;
gtmrecv_local->restart = GTMRECV_UPD_RESTARTED;
upd_helper_ctl->first_done = FALSE;
upd_helper_ctl->pre_read_offset = 0;
temp_read = 0;
temp_write = 0;
if (0 < tupd_num)
{
assert(donot_INVOKE_MUMTSTART);
assert(dollar_tlevel);
OP_TROLLBACK(0);
tupd_num = 0;
}
}
SHORT_SLEEP(10);
continue;
}
if (upd_proc_local->changelog)
{
if (upd_proc_local->changelog & REPLIC_CHANGE_UPD_LOGINTERVAL)
{
repl_log(updproc_log_fp, TRUE, TRUE, "Changing log interval from %u to %u\n",
log_interval, upd_proc_local->log_interval);
log_interval = upd_proc_local->log_interval;
lastlog_seqno = jnl_seqno - log_interval; /* force logging of the first transaction after the
* change in log interval */
}
if (upd_proc_local->changelog & REPLIC_CHANGE_LOGFILE)
{
log_switched = TRUE;
upd_log_init(UPDPROC);
}
if ( log_switched == TRUE )
repl_log(updproc_log_fp, TRUE, TRUE, "Change log to %s successful\n",
recvpool.upd_proc_local->log_file);
upd_proc_local->changelog = 0;
}
/* Assert that dollar_tlevel & tupd_num are in sync with each other. The only exception is if dollar_tlevel
* is non-zero, it is possible tupd_num is 0 in case we come here after a TP restart.
*/
assert((dollar_tlevel && (tupd_num || dollar_trestart)) || !dollar_tlevel && !tupd_num);
if (upd_proc_local->bad_trans
|| (!dollar_tlevel && (FALSE == recvpool.recvpool_ctl->wrapped)
&& (temp_write = recvpool.recvpool_ctl->write) == upd_proc_local->read))
{ /* bad-trans OR nothing to process. In case of the former, wait until receiver resets bad_trans.
* In the latter, wait until data is available in the receive pool.
* Note that we check two shared memory fields "recvpool_ctl->wrapped" and "recvpool_ctl->write" in
* that order. The receiver server updates them in the opposite order when it sets "wrapped" to TRUE.
* So it is possible we read an uptodate value of "write" but an outofdate value of "wrapped". This
* means we could incorrectly conclude the pool is empty when actually it is full. But we expect these
* situations to be rare enough that it is ok to do an idle buffer flush in this case. We will eventually
* get to see the uptodate value of "wrapped" at which point we will move on to process the transactions.
*/
assert((0 == recvpool.recvpool_ctl->jnl_seqno) || (jnl_seqno <= recvpool.recvpool_ctl->jnl_seqno));
/* the 0 == check takes care of the startup case where jnl_seqno is 0 in the recvpool_ctl */
/* If any dirty buffers, write them out since nothing else is happening now.
* wcs_wtstart only writes a few buffer a call and putting the call in a loop would flush the
* entire dirty buffer set, but there is no need for a loop around the call since we're already in
* the main updproc loop and this is the only place it really sleeps. */
save_reg = gv_cur_region;
/* Make sure cs_addrs and cs_data are in sync with gv_cur_region before TP_CHANGE_REG changes them */
assert((NULL == gv_cur_region) || (cs_addrs == &FILE_INFO(gv_cur_region)->s_addrs));
assert((NULL == gv_cur_region) || (cs_data == cs_addrs->hdr));
buffers_flushed = FALSE;
for (curr = gld_db_files; NULL != curr; curr = curr->next)
{
TP_CHANGE_REG(curr->gd);
if (cs_addrs->nl->wcs_active_lvl && (FALSE == gv_cur_region->read_only))
{
DCLAST_WCS_WTSTART(gv_cur_region, 0, wtstart_errno);
/* DCLAST_WCS_WTSTART macro does not set the wtstart_errno variable in VMS. But in
* any case, we do not support database file extensions with MM on VMS. So we could
* never get a ERR_GBLOFLOW error there. Therefore the file extension check below is
* done only in Unix.
*/
UNIX_ONLY(
if ((dba_mm == cs_data->acc_meth) && (ERR_GBLOFLOW == wtstart_errno))
wcs_recover(gv_cur_region);
)
buffers_flushed = TRUE;
}
}
TP_CHANGE_REG(save_reg);
/* To avoid a potential infinite cpu-bound loop if the wcs_active_lvl field is bogus and wcs_wtstart()
* returns immediately, we count the number of times a buffer flush has (potentially) occurred. When
* this count gets to an arbitrarily "large" value or there were no buffers to flush, we will sleep
* and start the count over again. If there are more than the "large" number of dirty buffers to flush,
* this logic only causes an extra benign sleep whenever the count is "large". */
if (buffers_flushed && (MAX_IDLE_HARD_SPINS > idle_flush_count))
idle_flush_count++;
else
{
idle_flush_count = 0;
SHORT_SLEEP(10);
}
continue;
}
idle_flush_count = 0;
/* The update process reads "recvpool_ctl->write" first and assumes that all data in the receive pool
* that it then reads (upto the "write" offset) is valid. In order for this assumption to hold good, the
* receiver server needs to do a write memory barrier after updating the receive pool data but before
* updating "write". The update process does a read memory barrier after reading "write" but before reading
* the receive pool data. Not enforcing this read order would mean we would have seen an updated "write"
* but not yet see the updated receive pool data which would mean whatever data we read from the receive pool
* will be stale (which if successfully processed could cause primary-secondary dbs to get out of sync).
*/
SHM_READ_MEMORY_BARRIER;
/* To take the wrapping of buffer in case of over flow ------------ */
/* assume receiver will update wrapped even for exact overflows */
write_wrap = recvpool.recvpool_ctl->write_wrap;
if (temp_read >= write_wrap)
{
assert(temp_read == write_wrap);
if (0 < tupd_num) /* receive pool cannot wrap in the middle of TP */
GTMASSERT; /* see process_tr_buff in gtmrecv_process for why */
if (FALSE == recvpool.recvpool_ctl->wrapped)
{ /* Update process in keeping up with receiver server, notices that there was a wrap
* (thru write and write_wrap). It has to wait till receiver sets wrapped.
*/
SHORT_SLEEP(1);
continue;
}
DEBUG_ONLY(
repl_log(updproc_log_fp, TRUE, FALSE,
"-- Wrapping -- read = %ld :: write_wrap = %ld :: upd_jnl_seqno = "INT8_FMT" "INT8_FMTX" \n",
temp_read, write_wrap, INT8_PRINT(jnl_seqno),INT8_PRINTX(jnl_seqno));
repl_log(updproc_log_fp, TRUE, TRUE,
"-------------> wrapped = %ld :: write = %ld :: recv_jnl_seqno = "INT8_FMT" "INT8_FMTX" \n",
recvpool.recvpool_ctl->wrapped, recvpool.recvpool_ctl->write,
INT8_PRINT(recvpool.recvpool_ctl->jnl_seqno),
INT8_PRINTX(recvpool.recvpool_ctl->jnl_seqno));
)
/* The update process reads (a) "recvpool_ctl->wrapped" first and then reads (b) "recvpool_ctl->write".
* If "wrapped" is TRUE, it assumes that "write" will never hold a stale value that corresponds to a
* previous state of "wrapped" For this assumption to hold good, the receiver server needs to do a write
* memory barrier after updating "write" but before updating "wrapped". The update process will do a read
* memory barrier after reading "wrapped" but before reading "write". This way we are guaranteed the
* update process will never see a value of "write" that is at the tail end of the receive pool once it
* sees "wrapped" as TRUE. Not having this guarantee means we would have wrapped to read from the
* beginning of the receive pool but will continue to see "write" at the tail of the receive pool and will
* therefore treat almost the entire receive pool as containing unprocessed data when it is not the case.
*/
SHM_READ_MEMORY_BARRIER;
temp_read = 0;
upd_proc_local->read = 0;
recvpool.recvpool_ctl->wrapped = FALSE;
readaddrs = recvpool.recvdata_base;
temp_write = recvpool.recvpool_ctl->write;
if (0 == temp_write)
continue; /* Receiver server wrapped but hasn't yet written anything into the pool */
}
rec = (jnl_record *)readaddrs;
rectype = (enum jnl_record_type)rec->prefix.jrec_type;
rec_len = rec->prefix.forwptr;
assert(IS_REPLICATED(rectype));
UNIX_ONLY(
if (JRT_TRIPLE == rectype)
{ /* Source server has sent a REPL_NEW_TRIPLE message in the middle of logical journal records.
* Construct the triple from the input message and add it to the replication instance file.
*/
repl_log(updproc_log_fp, TRUE, TRUE, "Processing REPL_NEW_TRIPLE message\n");
triplecontent = (repl_triple_jnl_ptr_t)readaddrs;
memset(&triple, 0, SIZEOF(triple));
triple.start_seqno = triplecontent->start_seqno;
memcpy(triple.root_primary_instname, triplecontent->instname, MAX_INSTNAME_LEN - 1);
triple.root_primary_cycle = triplecontent->cycle;
memcpy(triple.rcvd_from_instname, triplecontent->rcvd_from_instname, MAX_INSTNAME_LEN - 1);
repl_log(updproc_log_fp, TRUE, TRUE, "New Triple Content : Start Seqno = %llu [0x%llx] : "
"Root Primary = [%s] : Cycle = [%d] : Received from instance = [%s]\n",
triple.start_seqno, triple.start_seqno, triple.root_primary_instname,
triple.root_primary_cycle, triple.rcvd_from_instname);
if (SIZEOF(repl_triple_jnl_t) != rec_len)
{
bad_trans_type = upd_bad_triple_len;
assert(FALSE);
} else if (triple.start_seqno != recvpool.upd_proc_local->read_jnl_seqno)
{
bad_trans_type = upd_bad_triple_start_seqno1;
assert(FALSE);
} else if (triple.start_seqno > recvpool_ctl->jnl_seqno)
{
bad_trans_type = upd_bad_triple_start_seqno2;
assert(FALSE);
} else
bad_trans_type = upd_good_record;
if (upd_good_record != bad_trans_type)
{ /* Signal a BADTRANS */
repl_log(updproc_log_fp, TRUE, TRUE,
"-> Bad trans :: bad_trans_type = %ld type = %ld len = %ld "
"start_seqno = %llu [0x%llx] upd_read_seqno = %llu [0x%llx] "
"recvpool_jnl_seqno = %llu [0x%llx]\n",
bad_trans_type, rectype, rec_len, triple.start_seqno, triple.start_seqno,
recvpool.upd_proc_local->read_jnl_seqno, recvpool.upd_proc_local->read_jnl_seqno,
recvpool_ctl->jnl_seqno, recvpool_ctl->jnl_seqno);
upd_proc_local->bad_trans = TRUE;
assert(!dollar_tlevel);
if (dollar_tlevel)
{
repl_log(updproc_log_fp, TRUE, TRUE, "OP_TROLLBACK IS CALLED "
"-->Bad trans :: dollar_tlevel = %ld\n", dollar_tlevel);
OP_TROLLBACK(0);
}
readaddrs = recvpool.recvdata_base;
upd_proc_local->read = 0;
temp_read = 0;
temp_write = 0;
upd_rec_seqno = tupd_num = tcom_num = 0;
continue;
}
/* Now that we have constructed the triple, add it to the instance file. */
repl_inst_ftok_sem_lock();
repl_inst_triple_add(&triple);
repl_inst_ftok_sem_release();
/* Update pointers to indicate this record is now processed and move on to the next. */
readaddrs += rec_len;
temp_read += rec_len;
upd_proc_local->read = temp_read;
continue;
}
)
/* NOTE: All journal sequence number fields are at same offset */
if (ROUND_DOWN2(rec_len, JNL_REC_START_BNDRY) != rec_len)
{ /* We need that so REC_LEN_FROM_SUFFIX does not access unaligned int */
bad_trans_type = upd_bad_forwptr;
assert(FALSE);
} else if ((0 == rec_len) || (rec_len != (backptr = REC_LEN_FROM_SUFFIX(readaddrs, rec_len))))
{
bad_trans_type = upd_bad_backptr;
assert(FALSE);
} else if (!IS_REPLICATED(rectype))
{
bad_trans_type = upd_rec_not_replicated;
assert(FALSE);
} else if (jnl_seqno != rec->jrec_null.jnl_seqno)
{
bad_trans_type = upd_bad_jnl_seqno;
assert(FALSE);
} else
{
bad_trans_type = upd_good_record;
assert((jnl_seqno < recvpool.recvpool_ctl->jnl_seqno) ||
(0 == recvpool.recvpool_ctl->jnl_seqno));
if (JRT_TCOM == rectype)
{
assert(0 != upd_rec_seqno); /* we know TCOM is not created without a SET/KILL/ZKILL */
if (0 != upd_rec_seqno)
tcom_num++;
} else if (IS_SET_KILL_ZKILL_ZTRIG(rectype))
{
assert(!IS_ZTP(rectype));
keystr = (jnl_string *)&rec->jrec_set_kill.mumps_node;
GTMTRIG_ONLY(
nodeflags = keystr->nodeflags;
TRIG_PROCESS_JNL_STR_NODEFLAGS(nodeflags);
)
key_len = keystr->length; /* local copy of shared recvpool key */
if ((MAX_KEY_SZ >= key_len && 0 < key_len && 0 == keystr->text[key_len - 1]) &&
(upd_good_record == updproc_get_gblname(keystr->text, key_len, gv_mname, &mname)))
{
if (IS_SET(rectype))
{
val_mv.mvtype = MV_STR;
val_ptr = &keystr->text[keystr->length];
GET_MSTR_LEN(val_mv.str.len, val_ptr); /* length of value validated later */
val_mv.str.addr = val_ptr + SIZEOF(mstr_len_t);
}
if (IS_FENCED(rectype))
{
if (IS_TP(rectype))
{
assert(0 <= tupd_num);
assert(0 == tcom_num);
if (0 > tupd_num || 0 != tcom_num)
{
bad_trans_type = upd_fence_bad_t_num;
assert(FALSE);
} else if (IS_TUPD(rectype))
{
ts_mv.mvtype = MV_STR;
ts_mv.str.len = 0;
ts_mv.str.addr = NULL;
assert((!dollar_tlevel && !tupd_num) || dollar_tlevel
&& (tupd_num || dollar_trestart));
if (!dollar_tlevel)
{
assert(!donot_INVOKE_MUMTSTART);
DEBUG_ONLY(donot_INVOKE_MUMTSTART = TRUE);
op_tstart(IMPLICIT_TSTART, TRUE, &ts_mv, 0);
/* 0 ==> save no locals but RESTART OK */
}
tupd_num++;
}
upd_rec_seqno++;
} else if (IS_FUPD(rectype))
op_ztstart();
} else if (0 != tupd_num)
{
bad_trans_type = upd_nofence_bad_tupd_num;
assert(FALSE);
}
} else
{
bad_trans_type = upd_bad_key;
assert(FALSE);
}
} else if (IS_ZTWORM(rectype))
{
assert(IS_FENCED(rectype));
assert(IS_TP(rectype));
assert(0 <= tupd_num);
assert(0 == tcom_num);
if (0 > tupd_num || 0 != tcom_num)
{
bad_trans_type = upd_fence_bad_ztworm_t_num;
assert(FALSE);
} else if (IS_TUPD(rectype))
{
ts_mv.mvtype = MV_STR;
ts_mv.str.len = 0;
ts_mv.str.addr = NULL;
assert((!dollar_tlevel && !tupd_num) || dollar_tlevel && (tupd_num || dollar_trestart));
if (!dollar_tlevel)
/* 0 ==> save no locals but RESTART OK */
op_tstart(IMPLICIT_TSTART, TRUE, &ts_mv, 0);
tupd_num++;
}
upd_rec_seqno++;
}
}
if (upd_good_record == bad_trans_type)
{
if (JRT_NULL == rectype)
{
save_reg = gv_cur_region;
gv_cur_region = gld_db_files->gd;
tp_change_reg();
csa = cs_addrs;
assert(!csa->hold_onto_crit);
assert(!csa->now_crit);
grab_crit(gv_cur_region);
grab_lock(jnlpool.jnlpool_dummy_reg);
temp_jnlpool_ctl->write_addr = jnlpool_ctl->write_addr;
temp_jnlpool_ctl->write = jnlpool_ctl->write;
temp_jnlpool_ctl->jnl_seqno = jnlpool_ctl->jnl_seqno;
assert((temp_jnlpool_ctl->write_addr % temp_jnlpool_ctl->jnlpool_size) == temp_jnlpool_ctl->write);
jgbl.cumul_jnl_rec_len = SIZEOF(jnldata_hdr_struct) + NULL_RECLEN;
temp_jnlpool_ctl->write += SIZEOF(jnldata_hdr_struct);
if (temp_jnlpool_ctl->write >= temp_jnlpool_ctl->jnlpool_size)
{
assert(temp_jnlpool_ctl->write == temp_jnlpool_ctl->jnlpool_size);
temp_jnlpool_ctl->write = 0;
}
jnlpool_ctl->early_write_addr = jnlpool_ctl->write_addr + jgbl.cumul_jnl_rec_len;
/* Source server does not read in crit. It relies on early_write_addr, the transaction
* data, lastwrite_len, write_addr being updated in that order. To ensure this order,
* we have to force out early_write_addr to its coherency point now. If not, the source
* server may read data that is overwritten (or stale). This is true only on
* architectures and OSes that allow unordered memory access
*/
SHM_WRITE_MEMORY_BARRIER;
csa->ti->early_tn = csa->ti->curr_tn + 1;
if (JNL_WRITE_LOGICAL_RECS(csa))
{
SET_GBL_JREC_TIME; /* needed for jnl_put_jrt_pini() */
jpc = csa->jnl;
jbp = jpc->jnl_buff;
/* Before writing to jnlfile, adjust jgbl.gbl_jrec_time if needed to maintain time order
* of jnl records. This needs to be done BEFORE the jnl_ensure_open as that could write
* journal records (if it decides to switch to a new journal file).
*/
ADJUST_GBL_JREC_TIME(jgbl, jbp);
if (JNL_ENABLED(csa))
{
jnl_status = jnl_ensure_open();
if (0 == jnl_status)
{
if (0 == jpc->pini_addr)
jnl_put_jrt_pini(csa);
} else
{
if (SS_NORMAL != jpc->status)
rts_error(VARLSTCNT(7) jnl_status, 4, JNL_LEN_STR(csa->hdr),
DB_LEN_STR(gv_cur_region), jpc->status);
else
rts_error(VARLSTCNT(6) jnl_status, 4, JNL_LEN_STR(csa->hdr),
DB_LEN_STR(gv_cur_region));
}
}
null_record.prefix.pini_addr = (0 == jpc->pini_addr) ? JNL_HDR_LEN : jpc->pini_addr;
null_record.prefix.jrec_type = JRT_NULL;
null_record.prefix.forwptr = null_record.suffix.backptr = NULL_RECLEN;
null_record.prefix.time = jgbl.gbl_jrec_time;
null_record.prefix.tn = csa->ti->curr_tn;
null_record.prefix.checksum = INIT_CHECKSUM_SEED;
null_record.jnl_seqno = jnl_seqno;
null_record.suffix.suffix_code = JNL_REC_SUFFIX_CODE;
JNL_WRITE_APPROPRIATE(csa, jpc, JRT_NULL, (jnl_record *)&null_record, NULL, NULL);
}
temp_jnlpool_ctl->jnl_seqno++;
csa->hdr->reg_seqno = temp_jnlpool_ctl->jnl_seqno;
VMS_ONLY(
csa->hdr->resync_seqno = temp_jnlpool_ctl->jnl_seqno;
csa->hdr->resync_tn = csa->ti->curr_tn;
csa->hdr->old_resync_seqno = temp_jnlpool_ctl->jnl_seqno;
)
UNIX_ONLY(
assert(REPL_PROTO_VER_UNINITIALIZED != gtmrecv_local->last_valid_remote_proto_ver);
if (REPL_PROTO_VER_DUALSITE == gtmrecv_local->last_valid_remote_proto_ver)
csa->hdr->dualsite_resync_seqno = temp_jnlpool_ctl->jnl_seqno;
)
/* the following statements should be atomic */
jnl_header = (jnldata_hdr_ptr_t)(jnlpool.jnldata_base + jnlpool_ctl->write);
jnl_header->jnldata_len = temp_jnlpool_ctl->write - jnlpool_ctl->write +
(temp_jnlpool_ctl->write > jnlpool_ctl->write ? 0 : jnlpool_ctl->jnlpool_size);
jnl_header->prev_jnldata_len = jnlpool_ctl->lastwrite_len;
jnlpool_ctl->lastwrite_len = jnl_header->jnldata_len;
/* For systems with UNORDERED memory access (example, ALPHA, POWER4, PA-RISC 2.0), on a multi
* processor system, it is possible that the source server notices the change in write_addr
* before seeing the change to jnlheader->jnldata_len, leading it to read an invalid
* transaction length. To avoid such conditions, we should commit the order of shared
* memory updates before we update write_addr. This ensures that the source server sees all
* shared memory updates related to a transaction before the change in write_addr
*/
SHM_WRITE_MEMORY_BARRIER;
jnlpool_ctl->write_addr += jnl_header->jnldata_len;
jnlpool_ctl->write = temp_jnlpool_ctl->write;
jnlpool_ctl->jnl_seqno = temp_jnlpool_ctl->jnl_seqno;
INCREMENT_CURR_TN(csa->hdr);
assert(!csa->hold_onto_crit);
rel_lock(jnlpool.jnlpool_dummy_reg);
rel_crit(gv_cur_region);
jgbl.cumul_jnl_rec_len = 0;
incr_seqno = TRUE;
/* Restore gv_cur_region to what it was before the NULL record processing started. op_tstart
* relies on the fact that gv_target->gd_csa and cs_addrs be in sync. If prior to NULL record
* processing, gv_target->gd_csa pointed to AREG and after NULL record processing, gv_cur_region
* points to DEFAULT then the op_tstart assert DBG_CHECK_GVTARGET_CSADDRS_IN_SYNC will fail.
*/
TP_CHANGE_REG(save_reg);
} else if (JRT_TCOM == rectype)
{
if (tcom_num == tupd_num)
{
assert(0 != tcom_num);
memcpy(tcom_record.jnl_tid, rec->jrec_tcom.jnl_tid, TID_STR_SIZE);
op_tcommit();
if (dollar_tlevel)
{ /* op_tcommit restarted the transaction - do update process special
* handling for tpretry. The error below has special handling in a
* few condition handlers because it not so much signals an error
* as it does drive the necessary mechanisms to invoke a restart.
* Consequently this error can be overridden by a "real" error.
* For VMS, the extra parameters are specified to provide "placeholders"
* on the stack in the signal array if a real error needs to be
* overlayed in place of this one (see code in updproc_ch).
* Defined in tp.h, the below issues ERR_TPRETRY.
*/
INVOKE_RESTART;
}
DEBUG_ONLY(donot_INVOKE_MUMTSTART = FALSE);
/* Following two changes to dollar_ztwormhole reset the value of $ztwormhole to
* "undefined" which is important since it points into the receiver pool area
* that is about to be allowed to be overwritten. Prior to the next reference,
* to dollar_ztwormhole, it should be included in a journal record. Note since
* the ISV must always be defined op_svget will turn this state into an empty
* string if somehow $ZTWORMHOLE is referenced before the replicating instance
* receives a new value (like in a jobexam dump).
*/
GTMTRIG_ONLY(dollar_ztwormhole.mvtype = 0);
GTMTRIG_ONLY(dollar_ztwormhole.str.len = 0);
tcom_num = tupd_num = upd_rec_seqno = 0;
incr_seqno = TRUE;
}
} else if (JRT_ZTCOM == rectype)
{
assert(dollar_tlevel);
op_ztcommit(1);
incr_seqno = TRUE;
} else if (IS_SET_KILL_ZKILL_ZTRIG(rectype))
{
key = keystr->text;
UPD_GV_BIND_NAME_APPROPRIATE(gd_header, mname, key, key_len); /* if ^#t do special processing */
/* the above would have set gv_target and gv_cur_region appropriately */
csa = &FILE_INFO(gv_cur_region)->s_addrs;
if (!REPL_ALLOWED(csa))
{ /* Replication/Journaling is NOT enabled on the database file that the current
* global maps to on the secondary even though it was enabled on the corresponding
* database file on the primary. Do NOT allow this update to happen as otherwise
* the journal seqno on the secondary database will get out-of-sync with that of
* the primary database.
*/
gtm_putmsg(VARLSTCNT(6) ERR_UPDREPLSTATEOFF, 4,
mname.len, mname.addr, DB_LEN_STR(gv_cur_region));
/* Shut down the update process normally */
upd_proc_local->upd_proc_shutdown = SHUTDOWN;
break;
}
memcpy(gv_currkey->base, keystr->text, keystr->length);
gv_currkey->base[keystr->length] = 0; /* second null of a key terminator */
gv_currkey->end = keystr->length;
if (gv_currkey->end + 1 > gv_cur_region->max_key_size)
{
bad_trans_type = upd_bad_key_size;
assert(gtm_white_box_test_case_enabled
&& (WBTEST_UPD_PROCESS_ERROR == gtm_white_box_test_case_number));
} else
{
/* Scan the global for two reasons :
* (a) Need to setup gv_currkey->prev as update process can invoke triggers which
* could use naked references which relies on gv_currkey->prev being properly set
* (b) Set gv_some_subsc_null and gv_last_subsc_null accordingly to issue GTM-E-NULSUSBC
* if needed.
*/
key = (char *)(gv_currkey->base);
keyend = gv_currkey->end;
cntr = last_nullsubs = last_subs = 0;
assert((0 < keyend) && (KEY_DELIMITER != *key)); /* we better not have an empty key */
assert((KEY_DELIMITER == key[keyend])
&& (KEY_DELIMITER == key[keyend - 1]));
keytop = (char *)((&gv_currkey->base[0]) + keyend);
scan_char = (unsigned char)(*key);
while (cntr < keyend)
{
if (key >= keytop)
{ /* We should never come here as this would mean that we are attempting
* to cross gv_currkey->base boundary. */
assert(FALSE);
break;
}
assert(scan_char == (unsigned char)(*key)); /* ensure that scan_char was updated in
* the previous iteration*/
next_char = (unsigned char)(*(key + 1));
/* null subscripts are identified based on whether the region has standard null
* collation or default null collation. If former, the sequence is 0x01 0x00.
* If latter, the sequence is 0xFF 0x00
*/
if (last_subs == cntr)
{ /* Beginning of a new subscript. Ensure that if we have standard null
* collation, then we better don't see 0xFF 0x00 at the start of a
* subscript. Similary, if we have default null collation, we better not
* see 0x01 0x00 at the start of a subscript
*/
if (KEY_DELIMITER == next_char)
{
if (STR_SUB_PREFIX == scan_char)
{
assert(!secondary_side_std_null_coll);
last_nullsubs = cntr;
} else if (SUBSCRIPT_STDCOL_NULL == scan_char)
{
assert(secondary_side_std_null_coll);
last_nullsubs = cntr;
}
}
}
cntr++;
if ((KEY_DELIMITER == scan_char) && (KEY_DELIMITER != next_char))
{ /* New subscript. Note down the position for gv_currkey->prev. */
last_subs = cntr;
}
key++;
scan_char = next_char; /* set scan_char for next iteration */
}
assert(cntr == keyend);
assert(last_subs < keyend);
assert(last_nullsubs < keyend);
TREF(gv_some_subsc_null) = (last_nullsubs && (last_nullsubs < last_subs));
TREF(gv_last_subsc_null) = (last_nullsubs && (last_nullsubs == last_subs));
gv_currkey->prev = last_subs;
if (IS_KILL(rectype))
op_gvkill();
else if (IS_ZKILL(rectype))
op_gvzwithdraw();
# ifdef GTM_TRIGGER
else if (IS_ZTRIG(rectype))
op_ztrigger();
# endif
else
{
assert(IS_SET(rectype));
if (keystr->length + 1 + val_mv.str.len + SIZEOF(rec_hdr) >
gv_cur_region->max_rec_size)
{
bad_trans_type = upd_bad_val_size;
assert(gtm_white_box_test_case_enabled
&& (WBTEST_UPD_PROCESS_ERROR == gtm_white_box_test_case_number));
} else
op_gvput(&val_mv);
}
# ifdef GTM_TRIGGER
if (!gv_target->trig_mismatch_test_done)
{
gv_target->trig_mismatch_test_done = TRUE; /* reset only in targ_alloc */
primary_has_trigdef = (0 != (nodeflags & JS_HAS_TRIGGER_MASK));
secondary_has_trigdef = (NULL != gv_target->gvt_trigger);
if (primary_has_trigdef != secondary_has_trigdef)
{ /* trigger definitions out-of-sync between the primary and secondary.
* Issue warning in operator log
*/
if (primary_has_trigdef)
{
trigdef_inst = "originating";
no_trigdef_inst = "replicating";
} else
{
trigdef_inst = "replicating";
no_trigdef_inst = "originating";
}
send_msg(VARLSTCNT(9) ERR_TRIGDEFNOSYNC, 7, mname.len, mname.addr,
LEN_AND_STR(trigdef_inst), LEN_AND_STR(no_trigdef_inst),
&jnl_seqno);
}
}
# endif
if ((upd_good_record == bad_trans_type) && !IS_TP(rectype))
incr_seqno = TRUE;
if (disk_blk_read || 0 >= csa->n_pre_read_trigger)
{
csd = csa->hdr;
upd_helper_ctl->first_done = FALSE;
upd_helper_ctl->pre_read_offset = temp_read + rec_len;
REPL_DPRINT2("pre_read_offset = %x\n", upd_helper_ctl->pre_read_offset);
csa->n_pre_read_trigger = (int)((csd->n_bts * (float)csd->reserved_for_upd /
csd->avg_blks_per_100gbl) * csd->pre_read_trigger_factor / 100.0);
} else
csa->n_pre_read_trigger--;
disk_blk_read = FALSE;
}
} else if (IS_ZTWORM(rectype))
{
assert(dollar_tlevel); /* op_tstart should already have been done */
val_mv.mvtype = MV_STR;
val_mv.str.len = rec->jrec_ztworm.ztworm_str.length;
val_mv.str.addr = &rec->jrec_ztworm.ztworm_str.text[0];
op_svput(SV_ZTWORMHOLE, &val_mv);
}
}
if (upd_good_record != bad_trans_type)
{
tmpseqno = IS_REPLICATED(rectype) ? rec->jrec_null.jnl_seqno : jnl_seqno;
repl_log(updproc_log_fp, TRUE, TRUE,
"-> Bad trans :: bad_trans_type = %ld type = %ld len = %ld backptr = %ld jnl_seqno = %llu "
"[0x%llx]\n", bad_trans_type, rectype, rec_len, backptr, tmpseqno, tmpseqno);
upd_proc_local->bad_trans = TRUE;
/* We dont expect to be holding crit on any region in case of a bad_trans.
* Nevertheless release crit in PRO just in case we hold it.
*/
assert(0 == have_crit(CRIT_HAVE_ANY_REG));
if (dollar_tlevel)
{
repl_log(updproc_log_fp, TRUE, TRUE,
"OP_TROLLBACK IS CALLED -->Bad trans :: dollar_tlevel = %ld\n", dollar_tlevel);
OP_TROLLBACK(0); /* this should also release crit (if any) on all regions in TP */
assert(!dollar_tlevel);
} else
{ /* Non-TP : Release crit if any */
save_reg = gv_cur_region;
if ((NULL != save_reg) && save_reg->open)
{
csa = (sgmnt_addrs *)&FILE_INFO(save_reg)->s_addrs;
assert(NULL != csa); /* since save_reg->open is TRUE */
if (csa->now_crit)
rel_crit(save_reg);
}
}
assert(0 == have_crit(CRIT_HAVE_ANY_REG));
readaddrs = recvpool.recvdata_base;
upd_proc_local->read = 0;
temp_read = 0;
temp_write = 0;
upd_rec_seqno = tupd_num = tcom_num = 0;
/* Throw an error if bad_trans comes for the same sequence number */
if (last_errored_seqno == jnl_seqno)
{
last_errored_seqno = 0;
switch(bad_trans_type)
{
case upd_bad_key_size:
ISSUE_GVSUBOFLOW_ERROR(gv_currkey);
break;
case upd_bad_val_size:
if (0 == (end = format_targ_key(buff, MAX_ZWR_KEY_SZ, gv_currkey, TRUE)))
end = &buff[MAX_ZWR_KEY_SZ - 1];
rts_error(VARLSTCNT(10) ERR_REC2BIG, 4,
gv_currkey->end + 1 + val_mv.str.len + SIZEOF(rec_hdr),
(int4)gv_cur_region->max_rec_size, REG_LEN_STR(gv_cur_region),
ERR_GVIS, 2, end - buff, buff);
break;
}
} else
last_errored_seqno = jnl_seqno;
continue;
}
if (incr_seqno)
{
if (jnl_seqno - lastlog_seqno >= log_interval)
{
repl_log(updproc_log_fp, TRUE, TRUE, "Rectype = %d Committed Jnl seq no is : "
INT8_FMT" "INT8_FMTX" \n", rectype, INT8_PRINT(jnl_seqno), INT8_PRINTX(jnl_seqno));
lastlog_seqno = jnl_seqno;
}
upd_proc_local->read_jnl_seqno = ++jnl_seqno;
}
readaddrs += rec_len;
temp_read += rec_len;
if (0 == tupd_num)
upd_proc_local->read = temp_read;
}
REVERT; /* of updproc_ch() */
updproc_continue = FALSE;
}