Source code for pg_grant.parse

from typing import List, Optional, Sequence

from .types import Privileges, PgObjectType


def _get_acl_username(acl):
    """Port of ``copyAclUserName`` from ``dumputils.c``"""
    i = 0
    output = ''

    while i < len(acl) and acl[i] != '=':
        # If user name isn't quoted, then just add it to the output buffer
        if acl[i] != '"':
            output += acl[i]
            i += 1
        else:
            # Otherwise, it's a quoted username
            i += 1

            if i == len(acl):
                raise ValueError('ACL syntax error: unterminated quote.')

            # Loop until we come across an unescaped quote
            while not (acl[i] == '"' and acl[i + 1:i + 2] != '"'):
                # Quoting convention is to escape " as "".
                if acl[i] == '"' and acl[i + 1:i + 2] == '"':
                    i += 1

                output += acl[i]
                i += 1

                if i == len(acl):
                    raise ValueError('ACL syntax error: unterminated quote.')

            i += 1

    return i, output


[docs]def get_default_privileges(type: PgObjectType, owner: str) -> List[Privileges]: """Return a list of :class:`~pg_grant.types.Privileges` objects matching the default privileges for that type. This can be called when the ACL item from PostgreSQL is NULL to determine the implicit access privileges. .. seealso:: https://www.postgresql.org/docs/10/static/sql-grant.html """ # "the owner has all privileges by default" # "PostgreSQL treats the owner's privileges as having been granted by the # owner to themselves" priv_list = [Privileges(grantee=owner, grantor=owner, privs=['ALL'])] public_privs = None # "PostgreSQL grants default privileges on some types of objects to PUBLIC. # No privileges are granted to PUBLIC by default on tables, columns, # schemas or tablespaces. For other types, the default privileges granted # to PUBLIC are as follows: CONNECT and CREATE TEMP TABLE for databases; # EXECUTE privilege for functions; and USAGE privilege for languages." if type is PgObjectType.DATABASE: public_privs = ['CONNECT', 'TEMPORARY'] elif type is PgObjectType.FUNCTION: public_privs = ['EXECUTE'] elif type is PgObjectType.LANGUAGE: public_privs = ['USAGE'] elif type is PgObjectType.TYPE: # Seems like there's a documentation bug, and that types get USAGE by # default. # https://stackoverflow.com/questions/46656644 public_privs = ['USAGE'] if public_privs: priv_list.append( Privileges(grantee='PUBLIC', grantor=owner, privs=public_privs)) return priv_list
[docs]def parse_acl( acl: Sequence[str], type: Optional[PgObjectType] = None, subname: Optional[str] = None, ) -> List[Privileges]: """ Parameters: acl: ACL, e.g. ``['alice=arwdDxt/alice', 'bob=arwdDxt/alice']`` type: Optional. If passed, all privileges may be reduced to ``['ALL']``. subname: Optional, e.g. for column privileges. Returns: List of :class:`~.types.Privileges`. .. seealso:: This is a simple wrapper; :func:`.parse_acl_item` is called for each item in `acl`. """ return [parse_acl_item(i, type, subname) for i in acl]
[docs]def parse_acl_item( acl_item: str, type: Optional[PgObjectType] = None, subname: Optional[str] = None, ) -> Privileges: """Port of ``parseAclItem`` from `dumputils.c`_ Parameters: acl_item: ACL item, e.g. ``'alice=arwdDxt/bob'`` type: Optional. If passed, all privileges may be reduced to ``['ALL']``. subname: Optional, e.g. for column privileges. Must be output from :func:`psycopg2.extensions.quote_ident` or similar. .. warning:: If the ``privs`` or ``privswgo`` attributes of the returned object will be used to construct an SQL statement, `subname` **must be a valid identifier** (e.g. by calling :func:`psycopg2.extensions.quote_ident`) in order to prevent SQL injection attacks. :func:`~pg_grant.sql.grant` and :func:`~pg_grant.sql.revoke` are not vulnerable, because those functions quote the embedded identifier: .. code-block:: python >>> from pg_grant import PgObjectType, parse_acl_item >>> from pg_grant.sql import grant >>> privs = parse_acl_item('alice=r/bob', subname='user') >>> privs.privs ['SELECT (user)'] >>> str(grant(privs.privs, PgObjectType.TABLE, 'tbl1', 'alice')) 'GRANT SELECT ("user") ON TABLE tbl1 TO alice' Note that ``"user"`` was quoted by :func:`~pg_grant.sql.grant`. In other cases, make sure to quote `subname`: .. code-block:: python >>> import psycopg2 >>> from psycopg2.extensions import quote_ident >>> conn = psycopg2.connect(...) >>> parse_acl_item('alice=r/bob', subname=quote_ident('user', conn)) >>> privs.privs ['SELECT ("user")'] Returns: :class:`~.types.Privileges` """ eq_pos, grantee = _get_acl_username(acl_item) assert acl_item[eq_pos] == '=' if grantee == '': grantee = 'PUBLIC' slash_pos = acl_item.index('/', eq_pos) _, grantor = _get_acl_username(acl_item[slash_pos + 1:]) privs = [] privs_with_grant_option = [] all_with_grant_option = all_without_grant_option = True priv = acl_item[eq_pos + 1:slash_pos] def convert_priv(code, keyword): nonlocal all_with_grant_option, all_without_grant_option pos = priv.find(code) if pos >= 0: if priv[pos + 1:pos + 2] == '*': s = keyword if subname is not None: s += ' ({})'.format(subname) privs_with_grant_option.append(s) all_without_grant_option = False else: s = keyword if subname is not None: s += ' ({})'.format(subname) privs.append(s) all_with_grant_option = False else: all_with_grant_option = all_without_grant_option = False if type is None: convert_priv('r', 'SELECT') convert_priv('w', 'UPDATE') convert_priv('a', 'INSERT') convert_priv('d', 'DELETE') convert_priv('D', 'TRUNCATE') convert_priv('x', 'REFERENCES') convert_priv('t', 'TRIGGER') convert_priv('X', 'EXECUTE') convert_priv('U', 'USAGE') convert_priv('C', 'CREATE') convert_priv('c', 'CONNECT') convert_priv('T', 'TEMPORARY') # Don't think anything can have all of them, but set all to False # since we don't know type. all_with_grant_option = all_without_grant_option = False elif type in {PgObjectType.TABLE, PgObjectType.SEQUENCE}: convert_priv('r', 'SELECT') convert_priv('w', 'UPDATE') if type is PgObjectType.SEQUENCE: convert_priv('U', 'USAGE') else: convert_priv('a', 'INSERT') convert_priv('x', 'REFERENCES') if subname is None: convert_priv('d', 'DELETE') convert_priv('t', 'TRIGGER') convert_priv('D', 'TRUNCATE') elif type is PgObjectType.FUNCTION: convert_priv('X', 'EXECUTE') elif type is PgObjectType.LANGUAGE: convert_priv('U', 'USAGE') elif type is PgObjectType.SCHEMA: convert_priv('C', 'CREATE') convert_priv('U', 'USAGE') elif type is PgObjectType.DATABASE: convert_priv('C', 'CREATE') convert_priv('c', 'CONNECT') convert_priv('T', 'TEMPORARY') elif type is PgObjectType.TABLESPACE: convert_priv('C', 'CREATE') elif type is PgObjectType.TYPE: convert_priv('U', 'USAGE') elif type is PgObjectType.FOREIGN_DATA_WRAPPER: convert_priv('U', 'USAGE') elif type is PgObjectType.FOREIGN_SERVER: convert_priv('U', 'USAGE') elif type is PgObjectType.FOREIGN_TABLE: convert_priv('r', 'SELECT') elif type is PgObjectType.LARGE_OBJECT: convert_priv('r', 'SELECT') convert_priv('w', 'UPDATE') else: raise ValueError('Unknown type: {}'.format(type)) if all_with_grant_option: privs = [] privs_with_grant_option = [] s = 'ALL' if subname is not None: s += ' ({})'.format(subname) privs_with_grant_option.append(s) elif all_without_grant_option: privs = [] privs_with_grant_option = [] s = 'ALL' if subname is not None: s += ' ({})'.format(subname) privs.append(s) return Privileges(grantee, grantor, privs, privs_with_grant_option)