Consolidate and improve checking of key-column-attnum arguments for
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 15 Jun 2010 16:22:58 +0000 (16:22 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 15 Jun 2010 16:22:58 +0000 (16:22 +0000)
dblink_build_sql_insert() and related functions.  In particular, be sure to
reject references to dropped and out-of-range column numbers.  The numbers
are still interpreted as physical column numbers, though, for backward
compatibility.

This patch replaces Joe's patch of 2010-02-03, which handled only some aspects
of the problem.

contrib/dblink/dblink.c
contrib/dblink/expected/dblink.out

index 7bfdba53f34ed6c3fc68d166058b7c5cb7039165..b5e1c89370bdc6b288637e9fccef51680bb84031 100644 (file)
@@ -78,18 +78,20 @@ static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn * con);
 static void deleteConnection(const char *name);
 static char **get_pkey_attnames(Relation rel, int16 *numatts);
-static char *get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
-static char *get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
-static char *get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
+static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *quote_literal_cstr(char *rawstr);
 static char *quote_ident_cstr(char *rawstr);
-static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
-static HeapTuple get_tuple_of_interest(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
+static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
+static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
 static char *generate_relation_name(Relation rel);
 static char *connstr_strip_password(const char *connstr);
 static void dblink_security_check(PGconn *conn, remoteConn *rcon, const char *connstr);
-static int get_nondropped_natts(Relation rel);
+static void validate_pkattnums(Relation rel,
+                  int16 *pkattnums_arg, int32 pknumatts_arg,
+                  int **pkattnums, int *pknumatts);
 
 /* Global */
 List      *res_id = NIL;
@@ -937,9 +939,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
 }
 
 
-#ifndef SHRT_MAX
-#define SHRT_MAX (0x7FFF)
-#endif
 /*
  * dblink_build_sql_insert
  *
@@ -965,9 +964,10 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
 {
    Relation    rel;
    text       *relname_text;
-   int16      *pkattnums;
-   int         pknumatts_tmp;
-   int16       pknumatts = 0;
+   int16      *pkattnums_arg;
+   int32       pknumatts_arg;
+   int        *pkattnums;
+   int         pknumatts;
    char      **src_pkattvals;
    char      **tgt_pkattvals;
    ArrayType  *src_pkattvals_arry;
@@ -984,7 +984,6 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
    int16       typlen;
    bool        typbyval;
    char        typalign;
-   int         nondropped_natts;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -993,32 +992,13 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
     */
    rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
-   pkattnums = (int16 *) PG_GETARG_POINTER(1);
-   pknumatts_tmp = PG_GETARG_INT32(2);
-   if (pknumatts_tmp <= SHRT_MAX)
-       pknumatts = pknumatts_tmp;
-   else
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("input for number of primary key " \
-                       "attributes too large")));
-
    /*
-    * There should be at least one key attribute
+    * Process pkattnums argument.
     */
-   if (pknumatts == 0)
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("number of key attributes must be > 0")));
-
-   /*
-    * ensure we don't ask for more pk attributes than we have
-    * non-dropped columns
-    */
-   nondropped_natts = get_nondropped_natts(rel);
-   if (pknumatts > nondropped_natts)
-       ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
-               errmsg("number of primary key fields exceeds number of specified relation attributes")));
+   pkattnums_arg = (int16 *) PG_GETARG_POINTER(1);
+   pknumatts_arg = PG_GETARG_INT32(2);
+   validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
+                      &pkattnums, &pknumatts);
 
    src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
@@ -1127,9 +1107,10 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
 {
    Relation    rel;
    text       *relname_text;
-   int16      *pkattnums;
-   int         pknumatts_tmp;
-   int16       pknumatts = 0;
+   int16      *pkattnums_arg;
+   int32       pknumatts_arg;
+   int        *pkattnums;
+   int         pknumatts;
    char      **tgt_pkattvals;
    ArrayType  *tgt_pkattvals_arry;
    int         tgt_ndim;
@@ -1141,7 +1122,6 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
    int16       typlen;
    bool        typbyval;
    char        typalign;
-   int         nondropped_natts;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -1150,32 +1130,13 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
     */
    rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
-   pkattnums = (int16 *) PG_GETARG_POINTER(1);
-   pknumatts_tmp = PG_GETARG_INT32(2);
-   if (pknumatts_tmp <= SHRT_MAX)
-       pknumatts = pknumatts_tmp;
-   else
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("input for number of primary key " \
-                       "attributes too large")));
-
-   /*
-    * There should be at least one key attribute
-    */
-   if (pknumatts == 0)
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("number of key attributes must be > 0")));
-
    /*
-    * ensure we don't ask for more pk attributes than we have
-    * non-dropped columns
+    * Process pkattnums argument.
     */
-   nondropped_natts = get_nondropped_natts(rel);
-   if (pknumatts > nondropped_natts)
-       ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
-               errmsg("number of primary key fields exceeds number of specified relation attributes")));
+   pkattnums_arg = (int16 *) PG_GETARG_POINTER(1);
+   pknumatts_arg = PG_GETARG_INT32(2);
+   validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
+                      &pkattnums, &pknumatts);
 
    tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
 
@@ -1254,9 +1215,10 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
 {
    Relation    rel;
    text       *relname_text;
-   int16      *pkattnums;
-   int         pknumatts_tmp;
-   int16       pknumatts = 0;
+   int16      *pkattnums_arg;
+   int32       pknumatts_arg;
+   int        *pkattnums;
+   int         pknumatts;
    char      **src_pkattvals;
    char      **tgt_pkattvals;
    ArrayType  *src_pkattvals_arry;
@@ -1273,7 +1235,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    int16       typlen;
    bool        typbyval;
    char        typalign;
-   int         nondropped_natts;
 
    relname_text = PG_GETARG_TEXT_P(0);
 
@@ -1282,32 +1243,13 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
     */
    rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
-   pkattnums = (int16 *) PG_GETARG_POINTER(1);
-   pknumatts_tmp = PG_GETARG_INT32(2);
-   if (pknumatts_tmp <= SHRT_MAX)
-       pknumatts = pknumatts_tmp;
-   else
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("input for number of primary key " \
-                       "attributes too large")));
-
-   /*
-    * There should be one source array key values for each key attnum
-    */
-   if (pknumatts == 0)
-       ereport(ERROR,
-               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                errmsg("number of key attributes must be > 0")));
-
    /*
-    * ensure we don't ask for more pk attributes than we have
-    * non-dropped columns
+    * Process pkattnums argument.
     */
-   nondropped_natts = get_nondropped_natts(rel);
-   if (pknumatts > nondropped_natts)
-       ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
-               errmsg("number of primary key fields exceeds number of specified relation attributes")));
+   pkattnums_arg = (int16 *) PG_GETARG_POINTER(1);
+   pknumatts_arg = PG_GETARG_INT32(2);
+   validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
+                      &pkattnums, &pknumatts);
 
    src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
@@ -1470,7 +1412,7 @@ get_pkey_attnames(Relation rel, int16 *numatts)
 }
 
 static char *
-get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
    char       *relname;
    HeapTuple   tuple;
@@ -1479,7 +1421,7 @@ get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
    StringInfo  str = makeStringInfo();
    char       *sql;
    char       *val;
-   int16       key;
+   int         key;
    int         i;
    bool        needComma;
 
@@ -1526,7 +1468,7 @@ get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
            appendStringInfo(str, ",");
 
        if (tgt_pkattvals != NULL)
-           key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+           key = get_attnum_pk_pos(pkattnums, pknumatts, i);
        else
            key = -1;
 
@@ -1554,7 +1496,7 @@ get_sql_insert(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
 }
 
 static char *
-get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
 {
    char       *relname;
    TupleDesc   tupdesc;
@@ -1573,13 +1515,13 @@ get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkatt
    appendStringInfo(str, "DELETE FROM %s WHERE ", relname);
    for (i = 0; i < pknumatts; i++)
    {
-       int16       pkattnum = pkattnums[i];
+       int         pkattnum = pkattnums[i];
 
        if (i > 0)
            appendStringInfo(str, " AND ");
 
        appendStringInfo(str, "%s",
-       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
+       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
 
        if (tgt_pkattvals != NULL)
            val = pstrdup(tgt_pkattvals[i]);
@@ -1604,7 +1546,7 @@ get_sql_delete(Relation rel, int16 *pkattnums, int16 pknumatts, char **tgt_pkatt
 }
 
 static char *
-get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
    char       *relname;
    HeapTuple   tuple;
@@ -1613,7 +1555,7 @@ get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
    StringInfo  str = makeStringInfo();
    char       *sql;
    char       *val;
-   int16       key;
+   int         key;
    int         i;
    bool        needComma;
 
@@ -1644,7 +1586,7 @@ get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
                  quote_ident_cstr(NameStr(tupdesc->attrs[i]->attname)));
 
        if (tgt_pkattvals != NULL)
-           key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+           key = get_attnum_pk_pos(pkattnums, pknumatts, i);
        else
            key = -1;
 
@@ -1667,18 +1609,18 @@ get_sql_update(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkatt
 
    for (i = 0; i < pknumatts; i++)
    {
-       int16       pkattnum = pkattnums[i];
+       int         pkattnum = pkattnums[i];
 
        if (i > 0)
            appendStringInfo(str, " AND ");
 
        appendStringInfo(str, "%s",
-       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
+       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
 
        if (tgt_pkattvals != NULL)
            val = pstrdup(tgt_pkattvals[i]);
        else
-           val = SPI_getvalue(tuple, tupdesc, pkattnum);
+           val = SPI_getvalue(tuple, tupdesc, pkattnum + 1);
 
        if (val != NULL)
        {
@@ -1732,8 +1674,8 @@ quote_ident_cstr(char *rawstr)
    return result;
 }
 
-static int16
-get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
+static int
+get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
 {
    int         i;
 
@@ -1748,7 +1690,7 @@ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
 }
 
 static HeapTuple
-get_tuple_of_interest(Relation rel, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
+get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
 {
    char       *relname;
    TupleDesc   tupdesc;
@@ -1779,13 +1721,13 @@ get_tuple_of_interest(Relation rel, int16 *pkattnums, int16 pknumatts, char **sr
 
    for (i = 0; i < pknumatts; i++)
    {
-       int16       pkattnum = pkattnums[i];
+       int         pkattnum = pkattnums[i];
 
        if (i > 0)
            appendStringInfo(str, " AND ");
 
        appendStringInfo(str, "%s",
-       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum - 1]->attname)));
+       quote_ident_cstr(NameStr(tupdesc->attrs[pkattnum]->attname)));
 
        val = pstrdup(src_pkattvals[i]);
        if (val != NULL)
@@ -2139,23 +2081,48 @@ dblink_security_check(PGconn *conn, remoteConn *rcon, const char *connstr)
    }
 }
 
-static int
-get_nondropped_natts(Relation rel)
+/*
+ * Validate the PK-attnums argument for dblink_build_sql_insert() and related
+ * functions, and translate to the internal representation.
+ *
+ * The user supplies an int2vector of 1-based physical attnums, plus a count
+ * argument (the need for the separate count argument is historical, but we
+ * still check it).  We check that each attnum corresponds to a valid,
+ * non-dropped attribute of the rel.  We do *not* prevent attnums from being
+ * listed twice, though the actual use-case for such things is dubious.
+ *
+ * The internal representation is a palloc'd int array of 0-based physical
+ * attnums.
+ */
+static void
+validate_pkattnums(Relation rel,
+                  int16 *pkattnums_arg, int32 pknumatts_arg,
+                  int **pkattnums, int *pknumatts)
 {
-   int         nondropped_natts = 0;
-   TupleDesc   tupdesc;
-   int         natts;
+   TupleDesc   tupdesc = rel->rd_att;
+   int         natts = tupdesc->natts;
    int         i;
 
-   tupdesc = rel->rd_att;
-   natts = tupdesc->natts;
+   /* Must have at least one pk attnum selected */
+   if (pknumatts_arg <= 0)
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                errmsg("number of key attributes must be > 0")));
 
-   for (i = 0; i < natts; i++)
+   /* Allocate output array */
+   *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
+   *pknumatts = pknumatts_arg;
+
+   /* Validate attnums and convert to internal form */
+   for (i = 0; i < pknumatts_arg; i++)
    {
-       if (tupdesc->attrs[i]->attisdropped)
-           continue;
-       nondropped_natts++;
-   }
+       int     pkattnum = pkattnums_arg[i];
 
-   return nondropped_natts;
+       if (pkattnum <= 0 || pkattnum > natts ||
+           tupdesc->attrs[pkattnum - 1]->attisdropped)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg("invalid attribute number %d", pkattnum)));
+       (*pkattnums)[i] = pkattnum - 1;
+   }
 }
index 4c27ad23651d40478e2765a764b8cc6f496307df..cbb272c74f64e6a6c6e8944e6456aee897dae8a7 100644 (file)
@@ -46,7 +46,7 @@ SELECT dblink_build_sql_insert('foo','1 2',2,'{"0", "a"}','{"99", "xyz"}');
 
 -- too many pk fields, should fail
 SELECT dblink_build_sql_insert('foo','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}');
-ERROR:  number of primary key fields exceeds number of specified relation attributes
+ERROR:  invalid attribute number 4
 -- build an update statement based on a local tuple,
 -- replacing the primary key values with new ones
 SELECT dblink_build_sql_update('foo','1 2',2,'{"0", "a"}','{"99", "xyz"}');
@@ -57,7 +57,7 @@ SELECT dblink_build_sql_update('foo','1 2',2,'{"0", "a"}','{"99", "xyz"}');
 
 -- too many pk fields, should fail
 SELECT dblink_build_sql_update('foo','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}','{"99", "xyz", "{za0,zb0,zc0}"}');
-ERROR:  number of primary key fields exceeds number of specified relation attributes
+ERROR:  invalid attribute number 4
 -- build a delete statement based on a local tuple,
 SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');
            dblink_build_sql_delete           
@@ -67,7 +67,7 @@ SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');
 
 -- too many pk fields, should fail
 SELECT dblink_build_sql_delete('foo','1 2 3 4',4,'{"0", "a", "{a0,b0,c0}"}');
-ERROR:  number of primary key fields exceeds number of specified relation attributes
+ERROR:  invalid attribute number 4
 -- retest using a quoted and schema qualified table
 CREATE SCHEMA "MySchema";
 CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key (f1,f2));