import re
from sqlalchemy import inspect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import ClauseElement, Executable
from pg_grant.types import PgObjectType
__all__ = (
'grant',
'revoke',
)
_re_valid_priv = re.compile(
r'(SELECT|UPDATE|INSERT|DELETE|TRUNCATE|REFERENCES|TRIGGER|EXECUTE|USAGE'
r'|CREATE|CONNECT|TEMPORARY|ALL)(?:\s+\((.*)\))?')
def _as_table(element):
"""Allow a Table or ORM model to be used as a table name."""
insp = inspect(element, raiseerr=False)
try:
return insp.selectable
except AttributeError:
raise ValueError('Expected table element.')
class _GrantRevoke(Executable, ClauseElement):
valid_privileges = {
'SELECT',
'UPDATE',
'INSERT',
'DELETE',
'TRUNCATE',
'REFERENCES',
'TRIGGER',
'EXECUTE',
'USAGE',
'CREATE',
'CONNECT',
'TEMPORARY',
'ALL',
}
keyword = None
def __init__(self, privileges, type: PgObjectType, target, grantee,
grant_option=False, schema=None, arg_types=None,
quote_subname=True):
if privileges == 'ALL':
privileges = ['ALL']
self.privileges = privileges
self.priv_type = type
self.target = target
self.grantee = grantee
self.grant_option = grant_option
self.schema = schema
self.arg_types = arg_types
self.quote_subname = quote_subname
class _Grant(_GrantRevoke):
inherit_cache = False
keyword = 'GRANT'
class _Revoke(_GrantRevoke):
inherit_cache = False
keyword = 'REVOKE'
@compiles(_GrantRevoke)
def _pg_grant(element, compiler, **kw):
target = element.target
schema = element.schema
arg_types = element.arg_types
priv_type = element.priv_type
preparer = compiler.preparer
privs = []
for priv in element.privileges:
match = _re_valid_priv.match(priv)
if match is None:
raise ValueError('Privilege not valid: {}'.format(priv))
subname = match.group(2)
if subname is not None:
if element.quote_subname:
subname = preparer.quote(subname)
privs.append('{} ({})'.format(match.group(1), subname))
else:
privs.append(match.group(1))
priv = ', '.join(privs)
str_target = None
if isinstance(target, str):
if schema is not None:
str_target = preparer.quote_schema(schema) + '.' + preparer.quote(target)
else:
str_target = preparer.quote(target)
else:
if schema is not None:
raise ValueError('schema argument not supported unless target is a string.')
if priv_type is not PgObjectType.FUNCTION and arg_types is not None:
raise ValueError('arg_types argument not supported unless type is FUNCTION.')
if priv_type is PgObjectType.TABLE:
if str_target is not None:
target = str_target
else:
target = compiler.process(_as_table(target), ashint=True)
elif priv_type is PgObjectType.SEQUENCE:
if str_target is not None:
target = str_target
else:
target = preparer.format_sequence(target)
elif priv_type is PgObjectType.TYPE:
if str_target is None:
target = compiler.process(target)
else:
target = str_target
elif priv_type is PgObjectType.FUNCTION:
if str_target is None:
target = compiler.process(target)
else:
if arg_types is None:
raise ValueError('Must use an empty sequence if function has '
'no arguments, not None.')
str_arg_types = ', '.join([preparer.quote(t) for t in arg_types])
target = '{}({})'.format(str_target, str_arg_types)
elif isinstance(priv_type, PgObjectType):
if str_target is None:
target = compiler.process(target)
else:
target = str_target
else:
raise ValueError('Unknown type: {}'.format(priv_type))
is_grant = element.keyword == 'GRANT'
grantee = element.grantee
if grantee.upper() != 'PUBLIC':
grantee = compiler.preparer.quote(element.grantee)
return '{}{} {} ON {} {} {} {}{}'.format(
element.keyword,
' GRANT OPTION FOR' if element.grant_option and not is_grant else '',
priv,
priv_type.value,
target,
'TO' if is_grant else 'FROM',
grantee,
' WITH GRANT OPTION' if element.grant_option and is_grant else '',
)
[docs]def grant(privileges, type: PgObjectType, target, grantee, grant_option=False,
schema=None, arg_types=None, quote_subname=True):
"""GRANT statement that may be executed by SQLAlchemy.
Parameters:
privileges: List of privileges (or ``'ALL'``).
type: PostgreSQL object type.
target: Object name, or appropriate SQLAlchemy object (e.g.
:class:`~sqlalchemy.schema.Table` or a declarative class).
grantee: Role to receive privileges.
grant_option: Whether the recipient may in turn grant these privileges
to others.
schema: Optional schema, if `target` is a string.
arg_types: Sequence of argument types for granting privileges on
functions. E.g. ``('int4', 'int4')`` or ``()``.
quote_subname: Quote subname identifier in privileges, e.g.
``'SELECT (user)'`` -> ``'SELECT ("user")``. This should
only be ``False`` if the subname is already a valid
identifier.
.. seealso:: https://www.postgresql.org/docs/current/static/sql-grant.html
"""
return _Grant(
privileges, type, target, grantee, grant_option=grant_option,
schema=schema, arg_types=arg_types, quote_subname=quote_subname)
[docs]def revoke(privileges, type: PgObjectType, target, grantee, grant_option=False,
schema=None, arg_types=None, quote_subname=True):
"""REVOKE statement that may be executed by SQLAlchemy.
Parameters:
privileges: List of privileges (or ``'ALL'``).
type: PostgreSQL object type.
target: Object name, or appropriate SQLAlchemy object (e.g.
:class:`~sqlalchemy.schema.Table` or a declarative class).
grantee: Role to lose privileges.
grant_option: Whether to revoke the grant option for these privileges.
schema: Optional schema, if `target` is a string.
arg_types: Sequence of argument types for revoking privileges on
functions. E.g. ``('int4', 'int4')`` or ``()``.
quote_subname: Quote subname identifier in privileges, e.g.
``'SELECT (user)'`` -> ``'SELECT ("user")``. This should
only be ``False`` if the subname is already a valid
identifier.
.. warning:: When ``grant_option=True``, only the grant option is revoked,
not the privilege(s).
.. seealso:: https://www.postgresql.org/docs/current/static/sql-revoke.html
"""
return _Revoke(
privileges, type, target, grantee, grant_option=grant_option,
schema=schema, arg_types=arg_types, quote_subname=quote_subname)