Skip to content

[DNM] postgres_fdw arbitrary queries #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: REL_11_CARTO
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/postgres_fdw/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)

EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1.sql

REGRESS = postgres_fdw

Expand Down
69 changes: 69 additions & 0 deletions contrib/postgres_fdw/expected/postgres_fdw.out
Original file line number Diff line number Diff line change
Expand Up @@ -8801,3 +8801,72 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700

-- Clean-up
RESET enable_partitionwise_aggregate;
-- ===================================================================
-- test postgres_fdw_query(server name, sql text)
-- ===================================================================
-- Most simple SELECT through postgres_fdw_query
SELECT * FROM postgres_fdw_query('loopback', 'SELECT 42') AS t(i int);
i
----
42
(1 row)

-- Select schemas owned by the role configured in the user mapping
SELECT * FROM postgres_fdw_query('loopback', $$SELECT s.nspname
FROM pg_catalog.pg_namespace s
JOIN pg_catalog.pg_user u ON u.usesysid = s.nspowner
WHERE u.usename = current_user
ORDER BY s.nspname$$
) AS schemas(schema_name name);
schema_name
--------------------
S 1
import_dest1
import_dest2
import_dest3
import_dest4
import_dest5
import_source
information_schema
pg_catalog
pg_temp_1
pg_toast
pg_toast_temp_1
public
(13 rows)

-- Select tables and views in a given foreign schema that the role
-- configured in the user mapping has access to
SELECT * FROM postgres_fdw_query('loopback', $$SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'S 1'
ORDER BY table_name$$
) AS schemas(table_name text, table_type text);
table_name | table_type
------------+------------
T 1 | BASE TABLE
T 2 | BASE TABLE
T 3 | BASE TABLE
T 4 | BASE TABLE
(4 rows)

-- Test we can send commands (e.g: prepared statements)
SELECT * FROM postgres_fdw_query('loopback', $$PREPARE fooplan (int) AS
SELECT $1 + 42$$) AS t(res text);
res
---------
PREPARE
(1 row)

SELECT * FROM postgres_fdw_query('loopback', 'EXECUTE fooplan (1)') AS t(i int);
i
----
43
(1 row)

SELECT * FROM postgres_fdw_query('loopback', 'DEALLOCATE fooplan') AS t(res text);
res
------------
DEALLOCATE
(1 row)

7 changes: 7 additions & 0 deletions contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION postgres_fdw" to load this file. \quit

CREATE FUNCTION postgres_fdw_query(server name, sql text)
RETURNS SETOF record
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
21 changes: 21 additions & 0 deletions contrib/postgres_fdw/postgres_fdw--1.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION postgres_fdw" to load this file. \quit

CREATE FUNCTION postgres_fdw_handler()
RETURNS fdw_handler
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION postgres_fdw_validator(text[], oid)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION postgres_fdw_query(server name, sql text)
RETURNS SETOF record
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FOREIGN DATA WRAPPER postgres_fdw
HANDLER postgres_fdw_handler
VALIDATOR postgres_fdw_validator;
197 changes: 197 additions & 0 deletions contrib/postgres_fdw/postgres_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -5900,3 +5900,200 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
/* We didn't find any suitable equivalence class expression */
return NULL;
}



static void prepTuplestoreResult(FunctionCallInfo fcinfo);

PG_FUNCTION_INFO_V1(postgres_fdw_query);

Datum
postgres_fdw_query(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo;
Name server_name;
text *sql_text;
char *server;
char *sql;
Oid userid;
PGconn *conn;
UserMapping *user_mapping;
ForeignServer *foreign_server;
PGresult *res = NULL;
TupleDesc tupdesc;
int ntuples;
int nfields;
bool is_sql_cmd;

prepTuplestoreResult(fcinfo);
rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
/* One-time setup code appears here: */

// Get input args
server_name = PG_GETARG_NAME(0);
sql_text = PG_GETARG_TEXT_PP(1);

server = NameStr(*server_name);
sql = text_to_cstring(sql_text);

elog(DEBUG3, "server = %s", server);
elog(DEBUG3, "sql = %s", sql);

// Get a connection to the server with the current user
userid = GetUserId();
foreign_server = GetForeignServerByName(server, false);
user_mapping = GetUserMapping(userid, foreign_server->serverid);
conn = GetConnection(user_mapping, false);

// Execute the sql query
PG_TRY();
{
res = pgfdw_exec_query(conn, sql);

if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;

/*
* need a tuple descriptor representing one TEXT column to return
* the command status string as our result tuple
*/
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
ntuples = 1;
nfields = 1;
}
else
{
is_sql_cmd = false;

if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);

nfields = PQnfields(res);

/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* success */
break;
case TYPEFUNC_RECORD:
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}

/* make sure we have a persistent copy of the tupdesc */
tupdesc = CreateTupleDescCopy(tupdesc);
ntuples = PQntuples(res);
nfields = PQnfields(res);
}

/* check result and tuple descriptor have the same number of columns */
if (nfields != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));

if (ntuples > 0)
{
AttInMetadata *attinmeta;
Tuplestorestate *tupstore;
MemoryContext oldcontext;
int row;
char **values;

attinmeta = TupleDescGetAttInMetadata(tupdesc);

oldcontext = MemoryContextSwitchTo(
rsinfo->econtext->ecxt_per_query_memory);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);

values = (char **) palloc(nfields * sizeof(char *));

/* put all tuples into the tuplestore */
for (row = 0; row < ntuples; row++)
{
HeapTuple tuple;

if (is_sql_cmd)
{
values[0] = PQcmdStatus(res);
}
else
{
int i;

for (i = 0; i < nfields; i++)
{
if (PQgetisnull(res, row, i))
values[i] = NULL;
else
values[i] = PQgetvalue(res, row, i);
}
}

/* build the tuple and put it into the tuplestore. */
tuple = BuildTupleFromCStrings(attinmeta, values);
tuplestore_puttuple(tupstore, tuple);
}

/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);
}

PQclear(res);
}
PG_CATCH();
{
if (res)
PQclear(res);
PG_RE_THROW();
}
PG_END_TRY();

ReleaseConnection(conn);
return (Datum) 0;
}

/*
* Verify function caller can handle a tuplestore result, and set up for that.
*
* Note: if the caller returns without actually creating a tuplestore, the
* executor will treat the function result as an empty set.
*/
static void
prepTuplestoreResult(FunctionCallInfo fcinfo)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;

/* check to see if query supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));

/* let the executor know we're sending back a tuplestore */
rsinfo->returnMode = SFRM_Materialize;

/* caller must fill these to return a non-empty result */
rsinfo->setResult = NULL;
rsinfo->setDesc = NULL;
}
2 changes: 1 addition & 1 deletion contrib/postgres_fdw/postgres_fdw.control
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# postgres_fdw extension
comment = 'foreign-data wrapper for remote PostgreSQL servers'
default_version = '1.0'
default_version = '1.1'
module_pathname = '$libdir/postgres_fdw'
relocatable = true
30 changes: 30 additions & 0 deletions contrib/postgres_fdw/sql/postgres_fdw.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2407,3 +2407,33 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700

-- Clean-up
RESET enable_partitionwise_aggregate;


-- ===================================================================
-- test postgres_fdw_query(server name, sql text)
-- ===================================================================

-- Most simple SELECT through postgres_fdw_query
SELECT * FROM postgres_fdw_query('loopback', 'SELECT 42') AS t(i int);

-- Select schemas owned by the role configured in the user mapping
SELECT * FROM postgres_fdw_query('loopback', $$SELECT s.nspname
FROM pg_catalog.pg_namespace s
JOIN pg_catalog.pg_user u ON u.usesysid = s.nspowner
WHERE u.usename = current_user
ORDER BY s.nspname$$
) AS schemas(schema_name name);

-- Select tables and views in a given foreign schema that the role
-- configured in the user mapping has access to
SELECT * FROM postgres_fdw_query('loopback', $$SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'S 1'
ORDER BY table_name$$
) AS schemas(table_name text, table_type text);

-- Test we can send commands (e.g: prepared statements)
SELECT * FROM postgres_fdw_query('loopback', $$PREPARE fooplan (int) AS
SELECT $1 + 42$$) AS t(res text);
SELECT * FROM postgres_fdw_query('loopback', 'EXECUTE fooplan (1)') AS t(i int);
SELECT * FROM postgres_fdw_query('loopback', 'DEALLOCATE fooplan') AS t(res text);
Loading