static TransApplyAction get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo);
+static void replorigin_reset(int code, Datum arg);
+
/*
* Form the origin name for the subscription.
*
}
PG_CATCH();
{
+ /*
+ * Reset the origin state to prevent the advancement of origin
+ * progress if we fail to apply. Otherwise, this will result in
+ * transaction loss as that transaction won't be sent again by the
+ * server.
+ */
+ replorigin_reset(0, (Datum) 0);
+
if (MySubscription->disableonerr)
DisableSubscriptionAndExit();
else
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
- int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
- elevel = geterrlevel();
-
- /*
- * Reset the origin state to prevent the advancement of origin progress if
- * we fail to apply. Otherwise, this will result in transaction loss as
- * that transaction won't be sent again by the server.
- */
- if (elevel >= ERROR)
- replorigin_reset(0, (Datum) 0);
-
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
$node_publisher->stop('fast');
$node_subscriber->stop('fast');
+# The bug was that when an ERROR was caught and handled by a (PL/pgSQL)
+# function, the apply worker reset the replication origin but continued
+# processing subsequent changes. So, we fail to update the replication origin
+# during further apply operations. This can lead to the apply worker requesting
+# the changes that have been applied again after restarting.
+
+$node_publisher->rotate_logfile();
+$node_publisher->start();
+
+$node_subscriber->rotate_logfile();
+$node_subscriber->start();
+
+# Set up a publication with a table
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE t1 (a int);
+ CREATE PUBLICATION regress_pub FOR TABLE t1;
+));
+
+# Set up a subscription which subscribes the publication
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE t1 (a int);
+ CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;
+));
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub');
+
+# Create an AFTER INSERT trigger on the table that raises and subsequently
+# handles an exception. Subsequent insertions will trigger this exception,
+# causing the apply worker to invoke its error callback with an ERROR. However,
+# since the error is caught within the trigger, the apply worker will continue
+# processing changes.
+$node_subscriber->safe_psql(
+ 'postgres', q{
+CREATE FUNCTION handle_exception_trigger()
+RETURNS TRIGGER AS $$
+BEGIN
+ BEGIN
+ -- Raise an exception
+ RAISE EXCEPTION 'This is a test exception';
+ EXCEPTION
+ WHEN OTHERS THEN
+ RETURN NEW;
+ END;
+
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER silent_exception_trigger
+AFTER INSERT OR UPDATE ON t1
+FOR EACH ROW
+EXECUTE FUNCTION handle_exception_trigger();
+
+ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger;
+});
+
+# Obtain current remote_lsn value to check its advancement later
+my $remote_lsn = $node_subscriber->safe_psql('postgres',
+ "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
+);
+
+# Insert a tuple to replicate changes
+$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);");
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Confirms the origin can be advanced
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'"
+);
+is($result, 't',
+ 'remote_lsn has advanced for apply worker raising an exception');
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
+
done_testing();