Fix an oversight in 3f28b2fcac.
authorAmit Kapila <akapila@postgresql.org>
Wed, 23 Apr 2025 05:38:24 +0000 (11:08 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 23 Apr 2025 05:38:24 +0000 (11:08 +0530)
Commit 3f28b2fcac tried to ensure that the replication origin shouldn't be
advanced in case of an ERROR in the apply worker, so that it can request
the same data again after restart. However, it is possible that an ERROR
was caught and handled by a (say PL/pgSQL) function, and  the apply worker
continues to apply further changes, in which case, we shouldn't reset the
replication origin.

Ensure to reset the origin only when the apply worker exits after an
ERROR.

Commit 3f28b2fcac added new function geterrlevel, which we removed in HEAD
as part of this commit, but kept it in backbranches to avoid breaking any
applications. A separate case can be made to have such a function even for
HEAD.

Reported-by: Shawn McCoy <shawn.the.mccoy@gmail.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Backpatch-through: 16, where it was introduced
Discussion: https://postgr.es/m/CALsgZNCGARa2mcYNVTSj9uoPcJo-tPuWUGECReKpNgTpo31_Pw@mail.gmail.com

src/backend/replication/logical/worker.c
src/backend/utils/error/elog.c
src/include/utils/elog.h
src/test/subscription/t/100_bugs.pl

index 5ce596f4576b6b1bcb1f1152e7d88d938cb8b39f..4151a4b2a96ba67803895b61aa344e8ab7254d78 100644 (file)
@@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
                                                     ParallelApplyWorkerInfo **winfo);
 
+static void replorigin_reset(int code, Datum arg);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos)
    }
    PG_CATCH();
    {
+       /*
+        * Reset the origin state to prevent the advancement of origin
+        * progress if we fail to apply. Otherwise, this will result in
+        * transaction loss as that transaction won't be sent again by the
+        * server.
+        */
+       replorigin_reset(0, (Datum) 0);
+
        if (MySubscription->disableonerr)
            DisableSubscriptionAndExit();
        else
@@ -5004,23 +5014,12 @@ void
 apply_error_callback(void *arg)
 {
    ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
-   int         elevel;
 
    if (apply_error_callback_arg.command == 0)
        return;
 
    Assert(errarg->origin_name);
 
-   elevel = geterrlevel();
-
-   /*
-    * Reset the origin state to prevent the advancement of origin progress if
-    * we fail to apply. Otherwise, this will result in transaction loss as
-    * that transaction won't be sent again by the server.
-    */
-   if (elevel >= ERROR)
-       replorigin_reset(0, (Datum) 0);
-
    if (errarg->rel == NULL)
    {
        if (!TransactionIdIsValid(errarg->remote_xid))
index d6299633ab792b04dbea75f2e5e0e565d157d2dd..47af743990fe9a5819d165f73bcead833dda67d6 100644 (file)
@@ -1590,23 +1590,6 @@ geterrcode(void)
    return edata->sqlerrcode;
 }
 
-/*
- * geterrlevel --- return the currently set error level
- *
- * This is only intended for use in error callback subroutines, since there
- * is no other place outside elog.c where the concept is meaningful.
- */
-int
-geterrlevel(void)
-{
-   ErrorData  *edata = &errordata[errordata_stack_depth];
-
-   /* we don't bother incrementing recursion_depth */
-   CHECK_STACK_DEPTH();
-
-   return edata->elevel;
-}
-
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
index a5313c5d2d5ae9f8af623f80b404cecc2ba7ada3..5eac0e16970c3ab306321634caa75899d52b1d8c 100644 (file)
@@ -227,7 +227,6 @@ extern int  internalerrquery(const char *query);
 extern int err_generic_string(int field, const char *str);
 
 extern int geterrcode(void);
-extern int geterrlevel(void);
 extern int geterrposition(void);
 extern int getinternalerrposition(void);
 
index b3924ca4b090ccbe6e28a35e34cbb46821afd74b..5e3577011833b7ba14e264a7f44a13cdbee36ad9 100644 (file)
@@ -498,4 +498,81 @@ is( $node_publisher->psql(
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
+# The bug was that when an ERROR was caught and handled by a (PL/pgSQL)
+# function, the apply worker reset the replication origin but continued
+# processing subsequent changes. So, we fail to update the replication origin
+# during further apply operations. This can lead to the apply worker requesting
+# the changes that have been applied again after restarting.
+
+$node_publisher->rotate_logfile();
+$node_publisher->start();
+
+$node_subscriber->rotate_logfile();
+$node_subscriber->start();
+
+# Set up a publication with a table
+$node_publisher->safe_psql(
+   'postgres', qq(
+   CREATE TABLE t1 (a int);
+   CREATE PUBLICATION regress_pub FOR TABLE t1;
+));
+
+# Set up a subscription which subscribes the publication
+$node_subscriber->safe_psql(
+   'postgres', qq(
+   CREATE TABLE t1 (a int);
+   CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;
+));
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub');
+
+# Create an AFTER INSERT trigger on the table that raises and subsequently
+# handles an exception. Subsequent insertions will trigger this exception,
+# causing the apply worker to invoke its error callback with an ERROR. However,
+# since the error is caught within the trigger, the apply worker will continue
+# processing changes.
+$node_subscriber->safe_psql(
+   'postgres', q{
+CREATE FUNCTION handle_exception_trigger()
+RETURNS TRIGGER AS $$
+BEGIN
+   BEGIN
+       -- Raise an exception
+       RAISE EXCEPTION 'This is a test exception';
+   EXCEPTION
+       WHEN OTHERS THEN
+           RETURN NEW;
+   END;
+
+   RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER silent_exception_trigger
+AFTER INSERT OR UPDATE ON t1
+FOR EACH ROW
+EXECUTE FUNCTION handle_exception_trigger();
+
+ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger;
+});
+
+# Obtain current remote_lsn value to check its advancement later
+my $remote_lsn = $node_subscriber->safe_psql('postgres',
+   "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
+);
+
+# Insert a tuple to replicate changes
+$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);");
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Confirms the origin can be advanced
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
+);
+is($result, 't',
+   'remote_lsn has advanced for apply worker raising an exception');
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
+
 done_testing();