from typing import List, Optional, Tuple, Union
from .types import PgObjectType, Privileges
def _get_acl_username(acl: str) -> Tuple[int, str]:
"""Port of ``copyAclUserName`` from ``dumputils.c``"""
i = 0
output = ""
while i < len(acl) and acl[i] != "=":
# If user name isn't quoted, then just add it to the output buffer
if acl[i] != '"':
output += acl[i]
i += 1
else:
# Otherwise, it's a quoted username
i += 1
if i == len(acl):
raise ValueError("ACL syntax error: unterminated quote.")
# Loop until we come across an unescaped quote
while not (acl[i] == '"' and acl[i + 1 : i + 2] != '"'):
# Quoting convention is to escape " as "".
if acl[i] == '"' and acl[i + 1 : i + 2] == '"':
i += 1
output += acl[i]
i += 1
if i == len(acl):
raise ValueError("ACL syntax error: unterminated quote.")
i += 1
return i, output
[docs]
def get_default_privileges(type: PgObjectType, owner: str) -> List[Privileges]:
"""Return a list of :class:`~pg_grant.types.Privileges` objects matching the
default privileges for that type.
This can be called when the ACL item from PostgreSQL is NULL to determine
the implicit access privileges.
.. seealso:: https://www.postgresql.org/docs/10/static/sql-grant.html
"""
# "the owner has all privileges by default"
# "PostgreSQL treats the owner's privileges as having been granted by the
# owner to themselves"
priv_list = [Privileges(grantee=owner, grantor=owner, privs=["ALL"])]
public_privs = None
# "PostgreSQL grants default privileges on some types of objects to PUBLIC.
# No privileges are granted to PUBLIC by default on tables, columns,
# schemas or tablespaces. For other types, the default privileges granted
# to PUBLIC are as follows: CONNECT and CREATE TEMP TABLE for databases;
# EXECUTE privilege for functions; and USAGE privilege for languages."
if type is PgObjectType.DATABASE:
public_privs = ["CONNECT", "TEMPORARY"]
elif type is PgObjectType.FUNCTION:
public_privs = ["EXECUTE"]
elif type is PgObjectType.LANGUAGE:
public_privs = ["USAGE"]
elif type is PgObjectType.TYPE:
# Seems like there's a documentation bug, and that types get USAGE by
# default.
# https://stackoverflow.com/questions/46656644
public_privs = ["USAGE"]
elif type is PgObjectType.DOMAIN:
public_privs = ["USAGE"]
if public_privs:
priv_list.append(
Privileges(grantee="PUBLIC", grantor=owner, privs=public_privs)
)
return priv_list
[docs]
def parse_acl(
acl: Union[List[str], Tuple[str, ...]],
type: Optional[PgObjectType] = None,
subname: Optional[str] = None,
) -> List[Privileges]:
"""
Parameters:
acl: ACL, e.g. ``['alice=arwdDxt/alice', 'bob=arwdDxt/alice']``
type: Optional. If passed, all privileges may be reduced to ``['ALL']``.
subname: Optional, e.g. for column privileges.
Returns:
List of :class:`~.types.Privileges`.
.. seealso::
This is a simple wrapper; :func:`.parse_acl_item` is called for each
item in `acl`.
"""
return [parse_acl_item(i, type, subname) for i in acl]
[docs]
def parse_acl_item(
acl_item: str,
type: Optional[PgObjectType] = None,
subname: Optional[str] = None,
) -> Privileges:
"""Port of ``parseAclItem`` from `dumputils.c`_
Parameters:
acl_item: ACL item, e.g. ``'alice=arwdDxt/bob'``
type: Optional. If passed, all privileges may be reduced to ``['ALL']``.
subname: Optional, e.g. for column privileges. Must be output from
:meth:`psycopg.sql.Identifier` or similar.
.. warning::
If the ``privs`` or ``privswgo`` attributes of the returned object will
be used to construct an SQL statement, `subname` **must be a valid
identifier** (e.g. by calling :meth:`~psycopg.sql.Composable.as_string`
on :class:`psycopg.sql.Identifier`) in order to prevent SQL injection
attacks.
:func:`~pg_grant.sql.grant` and :func:`~pg_grant.sql.revoke` are not
vulnerable, because those functions quote the embedded identifier:
.. code-block:: pycon
>>> from pg_grant import PgObjectType, parse_acl_item
>>> from pg_grant.sql import grant
>>> privs = parse_acl_item("alice=r/bob", subname="user")
>>> privs.privs
['SELECT (user)']
>>> str(grant(privs.privs, PgObjectType.TABLE, "tbl1", "alice"))
'GRANT SELECT ("user") ON TABLE tbl1 TO alice'
Note that ``"user"`` was quoted by :func:`~pg_grant.sql.grant`.
In other cases, make sure to quote `subname`:
.. code-block:: pycon
>>> import psycopg
>>> from psycopg.sql import Identifier
>>> conn = psycopg.connect(...)
>>> parse_acl_item("alice=r/bob", subname=Identifier("user").as_string(conn))
>>> privs.privs
['SELECT ("user")']
Returns:
:class:`~.types.Privileges`
"""
eq_pos, grantee = _get_acl_username(acl_item)
assert acl_item[eq_pos] == "="
if grantee == "":
grantee = "PUBLIC"
slash_pos = acl_item.index("/", eq_pos)
_, grantor = _get_acl_username(acl_item[slash_pos + 1 :])
privs = []
privs_with_grant_option = []
all_with_grant_option = all_without_grant_option = True
priv = acl_item[eq_pos + 1 : slash_pos]
def convert_priv(code: str, keyword: str) -> None:
nonlocal all_with_grant_option, all_without_grant_option
pos = priv.find(code)
if pos >= 0:
if priv[pos + 1 : pos + 2] == "*":
s = keyword
if subname is not None:
s += f" ({subname})"
privs_with_grant_option.append(s)
all_without_grant_option = False
else:
s = keyword
if subname is not None:
s += f" ({subname})"
privs.append(s)
all_with_grant_option = False
else:
all_with_grant_option = all_without_grant_option = False
if type is None:
convert_priv("r", "SELECT")
convert_priv("w", "UPDATE")
convert_priv("a", "INSERT")
convert_priv("d", "DELETE")
convert_priv("D", "TRUNCATE")
convert_priv("x", "REFERENCES")
convert_priv("t", "TRIGGER")
convert_priv("X", "EXECUTE")
convert_priv("U", "USAGE")
convert_priv("C", "CREATE")
convert_priv("c", "CONNECT")
convert_priv("T", "TEMPORARY")
convert_priv("s", "SET")
convert_priv("A", "ALTER SYSTEM")
# Don't think anything can have all of them, but set all to False
# since we don't know type.
all_with_grant_option = all_without_grant_option = False
elif type in {PgObjectType.TABLE, PgObjectType.SEQUENCE}:
convert_priv("r", "SELECT")
convert_priv("w", "UPDATE")
if type is PgObjectType.SEQUENCE:
convert_priv("U", "USAGE")
else:
convert_priv("a", "INSERT")
convert_priv("x", "REFERENCES")
if subname is None:
convert_priv("d", "DELETE")
convert_priv("t", "TRIGGER")
convert_priv("D", "TRUNCATE")
elif type is PgObjectType.FUNCTION:
convert_priv("X", "EXECUTE")
elif type is PgObjectType.LANGUAGE:
convert_priv("U", "USAGE")
elif type is PgObjectType.SCHEMA:
convert_priv("C", "CREATE")
convert_priv("U", "USAGE")
elif type is PgObjectType.DATABASE:
convert_priv("C", "CREATE")
convert_priv("c", "CONNECT")
convert_priv("T", "TEMPORARY")
elif type is PgObjectType.TABLESPACE:
convert_priv("C", "CREATE")
elif type is PgObjectType.TYPE:
convert_priv("U", "USAGE")
elif type is PgObjectType.DOMAIN:
convert_priv("U", "USAGE")
elif type is PgObjectType.FOREIGN_DATA_WRAPPER:
convert_priv("U", "USAGE")
elif type is PgObjectType.FOREIGN_SERVER:
convert_priv("U", "USAGE")
elif type is PgObjectType.FOREIGN_TABLE:
convert_priv("r", "SELECT")
elif type is PgObjectType.LARGE_OBJECT:
convert_priv("r", "SELECT")
convert_priv("w", "UPDATE")
elif type is PgObjectType.PARAMETER:
convert_priv("s", "SET")
convert_priv("A", "ALTER SYSTEM")
else:
raise ValueError(f"Unknown type: {type}")
if all_with_grant_option:
privs = []
privs_with_grant_option = []
s = "ALL"
if subname is not None:
s += f" ({subname})"
privs_with_grant_option.append(s)
elif all_without_grant_option:
privs = []
privs_with_grant_option = []
s = "ALL"
if subname is not None:
s += f" ({subname})"
privs.append(s)
return Privileges(grantee, grantor, privs, privs_with_grant_option)