Source code for pg_grant.parse

from typing import List, Optional, Tuple, Union

from .types import PgObjectType, Privileges


def _get_acl_username(acl: str) -> Tuple[int, str]:
    """Port of ``copyAclUserName`` from ``dumputils.c``"""
    i = 0
    output = ""

    while i < len(acl) and acl[i] != "=":
        # If user name isn't quoted, then just add it to the output buffer
        if acl[i] != '"':
            output += acl[i]
            i += 1
        else:
            # Otherwise, it's a quoted username
            i += 1

            if i == len(acl):
                raise ValueError("ACL syntax error: unterminated quote.")

            # Loop until we come across an unescaped quote
            while not (acl[i] == '"' and acl[i + 1 : i + 2] != '"'):
                # Quoting convention is to escape " as "".
                if acl[i] == '"' and acl[i + 1 : i + 2] == '"':
                    i += 1

                output += acl[i]
                i += 1

                if i == len(acl):
                    raise ValueError("ACL syntax error: unterminated quote.")

            i += 1

    return i, output


[docs] def get_default_privileges(type: PgObjectType, owner: str) -> List[Privileges]: """Return a list of :class:`~pg_grant.types.Privileges` objects matching the default privileges for that type. This can be called when the ACL item from PostgreSQL is NULL to determine the implicit access privileges. .. seealso:: https://www.postgresql.org/docs/10/static/sql-grant.html """ # "the owner has all privileges by default" # "PostgreSQL treats the owner's privileges as having been granted by the # owner to themselves" priv_list = [Privileges(grantee=owner, grantor=owner, privs=["ALL"])] public_privs = None # "PostgreSQL grants default privileges on some types of objects to PUBLIC. # No privileges are granted to PUBLIC by default on tables, columns, # schemas or tablespaces. For other types, the default privileges granted # to PUBLIC are as follows: CONNECT and CREATE TEMP TABLE for databases; # EXECUTE privilege for functions; and USAGE privilege for languages." if type is PgObjectType.DATABASE: public_privs = ["CONNECT", "TEMPORARY"] elif type is PgObjectType.FUNCTION: public_privs = ["EXECUTE"] elif type is PgObjectType.LANGUAGE: public_privs = ["USAGE"] elif type is PgObjectType.TYPE: # Seems like there's a documentation bug, and that types get USAGE by # default. # https://stackoverflow.com/questions/46656644 public_privs = ["USAGE"] elif type is PgObjectType.DOMAIN: public_privs = ["USAGE"] if public_privs: priv_list.append( Privileges(grantee="PUBLIC", grantor=owner, privs=public_privs) ) return priv_list
[docs] def parse_acl( acl: Union[List[str], Tuple[str, ...]], type: Optional[PgObjectType] = None, subname: Optional[str] = None, ) -> List[Privileges]: """ Parameters: acl: ACL, e.g. ``['alice=arwdDxt/alice', 'bob=arwdDxt/alice']`` type: Optional. If passed, all privileges may be reduced to ``['ALL']``. subname: Optional, e.g. for column privileges. Returns: List of :class:`~.types.Privileges`. .. seealso:: This is a simple wrapper; :func:`.parse_acl_item` is called for each item in `acl`. """ return [parse_acl_item(i, type, subname) for i in acl]
[docs] def parse_acl_item( acl_item: str, type: Optional[PgObjectType] = None, subname: Optional[str] = None, ) -> Privileges: """Port of ``parseAclItem`` from `dumputils.c`_ Parameters: acl_item: ACL item, e.g. ``'alice=arwdDxt/bob'`` type: Optional. If passed, all privileges may be reduced to ``['ALL']``. subname: Optional, e.g. for column privileges. Must be output from :meth:`psycopg.sql.Identifier` or similar. .. warning:: If the ``privs`` or ``privswgo`` attributes of the returned object will be used to construct an SQL statement, `subname` **must be a valid identifier** (e.g. by calling :meth:`~psycopg.sql.Composable.as_string` on :class:`psycopg.sql.Identifier`) in order to prevent SQL injection attacks. :func:`~pg_grant.sql.grant` and :func:`~pg_grant.sql.revoke` are not vulnerable, because those functions quote the embedded identifier: .. code-block:: pycon >>> from pg_grant import PgObjectType, parse_acl_item >>> from pg_grant.sql import grant >>> privs = parse_acl_item("alice=r/bob", subname="user") >>> privs.privs ['SELECT (user)'] >>> str(grant(privs.privs, PgObjectType.TABLE, "tbl1", "alice")) 'GRANT SELECT ("user") ON TABLE tbl1 TO alice' Note that ``"user"`` was quoted by :func:`~pg_grant.sql.grant`. In other cases, make sure to quote `subname`: .. code-block:: pycon >>> import psycopg >>> from psycopg.sql import Identifier >>> conn = psycopg.connect(...) >>> parse_acl_item("alice=r/bob", subname=Identifier("user").as_string(conn)) >>> privs.privs ['SELECT ("user")'] Returns: :class:`~.types.Privileges` """ eq_pos, grantee = _get_acl_username(acl_item) assert acl_item[eq_pos] == "=" if grantee == "": grantee = "PUBLIC" slash_pos = acl_item.index("/", eq_pos) _, grantor = _get_acl_username(acl_item[slash_pos + 1 :]) privs = [] privs_with_grant_option = [] all_with_grant_option = all_without_grant_option = True priv = acl_item[eq_pos + 1 : slash_pos] def convert_priv(code: str, keyword: str) -> None: nonlocal all_with_grant_option, all_without_grant_option pos = priv.find(code) if pos >= 0: if priv[pos + 1 : pos + 2] == "*": s = keyword if subname is not None: s += f" ({subname})" privs_with_grant_option.append(s) all_without_grant_option = False else: s = keyword if subname is not None: s += f" ({subname})" privs.append(s) all_with_grant_option = False else: all_with_grant_option = all_without_grant_option = False if type is None: convert_priv("r", "SELECT") convert_priv("w", "UPDATE") convert_priv("a", "INSERT") convert_priv("d", "DELETE") convert_priv("D", "TRUNCATE") convert_priv("x", "REFERENCES") convert_priv("t", "TRIGGER") convert_priv("X", "EXECUTE") convert_priv("U", "USAGE") convert_priv("C", "CREATE") convert_priv("c", "CONNECT") convert_priv("T", "TEMPORARY") convert_priv("s", "SET") convert_priv("A", "ALTER SYSTEM") # Don't think anything can have all of them, but set all to False # since we don't know type. all_with_grant_option = all_without_grant_option = False elif type in {PgObjectType.TABLE, PgObjectType.SEQUENCE}: convert_priv("r", "SELECT") convert_priv("w", "UPDATE") if type is PgObjectType.SEQUENCE: convert_priv("U", "USAGE") else: convert_priv("a", "INSERT") convert_priv("x", "REFERENCES") if subname is None: convert_priv("d", "DELETE") convert_priv("t", "TRIGGER") convert_priv("D", "TRUNCATE") elif type is PgObjectType.FUNCTION: convert_priv("X", "EXECUTE") elif type is PgObjectType.LANGUAGE: convert_priv("U", "USAGE") elif type is PgObjectType.SCHEMA: convert_priv("C", "CREATE") convert_priv("U", "USAGE") elif type is PgObjectType.DATABASE: convert_priv("C", "CREATE") convert_priv("c", "CONNECT") convert_priv("T", "TEMPORARY") elif type is PgObjectType.TABLESPACE: convert_priv("C", "CREATE") elif type is PgObjectType.TYPE: convert_priv("U", "USAGE") elif type is PgObjectType.DOMAIN: convert_priv("U", "USAGE") elif type is PgObjectType.FOREIGN_DATA_WRAPPER: convert_priv("U", "USAGE") elif type is PgObjectType.FOREIGN_SERVER: convert_priv("U", "USAGE") elif type is PgObjectType.FOREIGN_TABLE: convert_priv("r", "SELECT") elif type is PgObjectType.LARGE_OBJECT: convert_priv("r", "SELECT") convert_priv("w", "UPDATE") elif type is PgObjectType.PARAMETER: convert_priv("s", "SET") convert_priv("A", "ALTER SYSTEM") else: raise ValueError(f"Unknown type: {type}") if all_with_grant_option: privs = [] privs_with_grant_option = [] s = "ALL" if subname is not None: s += f" ({subname})" privs_with_grant_option.append(s) elif all_without_grant_option: privs = [] privs_with_grant_option = [] s = "ALL" if subname is not None: s += f" ({subname})" privs.append(s) return Privileges(grantee, grantor, privs, privs_with_grant_option)