import re
import sys
from typing import Any, ClassVar, List, Literal, Optional, Tuple, Union, cast, overload
from sqlalchemy import FromClause, Sequence, inspect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.compiler import SQLCompiler
from sqlalchemy.sql.expression import ClauseElement, Executable
from ._typing_sqlalchemy import AnyTarget, ArgTypesInput, TableTarget
from .types import PgObjectType
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
__all__ = (
"grant",
"revoke",
)
_re_valid_priv = re.compile(
r"(SELECT|UPDATE|INSERT|DELETE|TRUNCATE|REFERENCES|TRIGGER|EXECUTE|USAGE"
r"|CREATE|CONNECT|TEMPORARY|SET|ALTER SYSTEM|ALL)(?:\s+\((.*)\))?"
)
def _as_table(element: Any) -> FromClause:
"""Allow a Table or ORM model to be used as a table name."""
insp = inspect(element, raiseerr=False)
try:
return cast(FromClause, insp.selectable)
except AttributeError:
raise ValueError("Expected table element.")
# This could accept Sequence[str] but I think there's more utility to
# `privileges="SELECT" being a type error than allowing it.
PrivilegesInput: TypeAlias = Union[List[str], Tuple[str], Literal["ALL"]]
class _GrantRevoke(Executable, ClauseElement):
keyword: ClassVar[Optional[str]] = None
_privileges: Tuple[str, ...]
_priv_type: PgObjectType
_target: AnyTarget
_grantee: str
_grant_option: bool
_schema: Optional[str]
_arg_types: Optional[Tuple[str, ...]]
_quote_subname: bool
def __init__(
self,
privileges: PrivilegesInput,
type: PgObjectType,
target: AnyTarget,
grantee: str,
*,
grant_option: bool = False,
schema: Optional[str] = None,
arg_types: Optional[ArgTypesInput] = None,
quote_subname: bool = True,
):
if privileges == "ALL":
privileges = ("ALL",)
self._privileges = tuple(privileges)
self._priv_type = type
self._target = target
self._grantee = grantee
self._grant_option = grant_option
self._schema = schema
self._arg_types = None if arg_types is None else tuple(arg_types)
self._quote_subname = quote_subname
class _Grant(_GrantRevoke):
inherit_cache = False
keyword = "GRANT"
class _Revoke(_GrantRevoke):
inherit_cache = False
keyword = "REVOKE"
@compiles(_GrantRevoke) # type: ignore[no-untyped-call,misc]
def _pg_grant(element: _Grant, compiler: SQLCompiler, **kw: Any) -> str:
target = element._target
schema = element._schema
arg_types = element._arg_types
priv_type = element._priv_type
preparer = compiler.preparer
privs = []
for priv in element._privileges:
match = _re_valid_priv.match(priv)
if match is None:
raise ValueError(f"Privilege not valid: {priv}")
subname = match.group(2)
if subname is not None:
if element._quote_subname:
subname = preparer.quote(subname)
privs.append(f"{match.group(1)} ({subname})")
else:
privs.append(match.group(1))
priv = ", ".join(privs)
str_target = None
if isinstance(target, str):
if schema is not None:
str_target = preparer.quote_schema(schema) + "." + preparer.quote(target)
else:
str_target = preparer.quote(target)
else:
if schema is not None:
raise ValueError("schema argument not supported unless target is a string.")
if priv_type is not PgObjectType.FUNCTION and arg_types is not None:
raise ValueError("arg_types argument not supported unless type is FUNCTION.")
if priv_type is PgObjectType.TABLE:
if str_target is not None:
target = str_target
else:
target = compiler.process(_as_table(target), ashint=True)
elif priv_type is PgObjectType.SEQUENCE:
if str_target is not None:
target = str_target
else:
target = preparer.format_sequence(target) # type: ignore[no-untyped-call]
elif priv_type is PgObjectType.TYPE:
if str_target is None:
target = compiler.process(target) # type: ignore[arg-type]
else:
target = str_target
elif priv_type is PgObjectType.FUNCTION:
if str_target is None:
target = compiler.process(target) # type: ignore[arg-type]
else:
if arg_types is None:
raise ValueError(
"Must use an empty sequence if function has "
"no arguments, not None."
)
str_arg_types = ", ".join([preparer.quote(t) for t in arg_types])
target = f"{str_target}({str_arg_types})"
elif isinstance(priv_type, PgObjectType):
if str_target is None:
target = compiler.process(target) # type: ignore[arg-type]
else:
target = str_target
else:
raise ValueError(f"Unknown type: {priv_type}")
is_grant = element.keyword == "GRANT"
grantee = element._grantee
if grantee.upper() != "PUBLIC":
grantee = compiler.preparer.quote(grantee)
return "{}{} {} ON {} {} {} {}{}".format(
element.keyword,
" GRANT OPTION FOR" if element._grant_option and not is_grant else "",
priv,
priv_type.value,
target,
"TO" if is_grant else "FROM",
grantee,
" WITH GRANT OPTION" if element._grant_option and is_grant else "",
)
@overload
def grant(
privileges: PrivilegesInput,
type: Literal[
PgObjectType.TABLE,
PgObjectType.SEQUENCE,
PgObjectType.LANGUAGE,
PgObjectType.SCHEMA,
PgObjectType.DATABASE,
PgObjectType.TABLESPACE,
PgObjectType.TYPE,
PgObjectType.DOMAIN,
PgObjectType.FOREIGN_DATA_WRAPPER,
PgObjectType.FOREIGN_SERVER,
PgObjectType.FOREIGN_TABLE,
PgObjectType.LARGE_OBJECT,
PgObjectType.PARAMETER,
],
target: str,
grantee: str,
*,
grant_option: bool = ...,
schema: Optional[str] = ...,
arg_types: Optional[ArgTypesInput] = ...,
quote_subname: bool = ...,
) -> Executable:
"""This overload handles all cases where target and schema are strings
except functions (see arg_types).
"""
@overload
def grant(
privileges: PrivilegesInput,
type: Literal[PgObjectType.TABLE],
target: TableTarget,
grantee: str,
*,
grant_option: bool = ...,
schema: None = ...,
arg_types: None = ...,
quote_subname: bool = ...,
) -> Executable:
...
@overload
def grant(
privileges: PrivilegesInput,
type: Literal[PgObjectType.SEQUENCE],
target: Sequence,
grantee: str,
*,
grant_option: bool = ...,
schema: None = ...,
arg_types: None = ...,
quote_subname: bool = ...,
) -> Executable:
...
@overload
def grant(
privileges: PrivilegesInput,
type: Literal[PgObjectType.FUNCTION],
target: str,
grantee: str,
*,
grant_option: bool = ...,
schema: Optional[str] = ...,
arg_types: ArgTypesInput,
quote_subname: bool = ...,
) -> Executable:
...
[docs]
def grant(
privileges: PrivilegesInput,
type: PgObjectType,
target: AnyTarget,
grantee: str,
*,
grant_option: bool = False,
schema: Optional[str] = None,
arg_types: Optional[ArgTypesInput] = None,
quote_subname: bool = True,
) -> Executable:
"""GRANT statement that may be executed by SQLAlchemy.
Parameters:
privileges: List of privileges (or ``'ALL'``).
type: PostgreSQL object type.
target: Object name, or appropriate SQLAlchemy object (e.g.
:class:`~sqlalchemy.schema.Table` or a declarative class).
grantee: Role to receive privileges.
grant_option: Whether the recipient may in turn grant these privileges
to others.
schema: Optional schema, if `target` is a string.
arg_types: Sequence of argument types for granting privileges on
functions. E.g. ``('int4', 'int4')`` or ``()``.
quote_subname: Quote subname identifier in privileges, e.g.
``'SELECT (user)'`` -> ``'SELECT ("user")``. This should
only be ``False`` if the subname is already a valid
identifier.
.. seealso:: https://www.postgresql.org/docs/current/static/sql-grant.html
"""
return _Grant(
privileges,
type,
target,
grantee,
grant_option=grant_option,
schema=schema,
arg_types=arg_types,
quote_subname=quote_subname,
)
@overload
def revoke(
privileges: PrivilegesInput,
type: Literal[
PgObjectType.TABLE,
PgObjectType.SEQUENCE,
PgObjectType.LANGUAGE,
PgObjectType.SCHEMA,
PgObjectType.DATABASE,
PgObjectType.TABLESPACE,
PgObjectType.TYPE,
PgObjectType.DOMAIN,
PgObjectType.FOREIGN_DATA_WRAPPER,
PgObjectType.FOREIGN_SERVER,
PgObjectType.FOREIGN_TABLE,
PgObjectType.LARGE_OBJECT,
PgObjectType.PARAMETER,
],
target: str,
grantee: str,
*,
grant_option: bool = ...,
schema: Optional[str] = ...,
arg_types: Optional[ArgTypesInput] = ...,
quote_subname: bool = ...,
) -> Executable:
"""This overload handles all cases where target and schema are strings
except functions (see arg_types).
"""
@overload
def revoke(
privileges: PrivilegesInput,
type: Literal[PgObjectType.TABLE],
target: TableTarget,
grantee: str,
*,
grant_option: bool = ...,
schema: None = ...,
arg_types: None = ...,
quote_subname: bool = ...,
) -> Executable:
...
@overload
def revoke(
privileges: PrivilegesInput,
type: Literal[PgObjectType.SEQUENCE],
target: Sequence,
grantee: str,
*,
grant_option: bool = ...,
schema: None = ...,
arg_types: None = ...,
quote_subname: bool = ...,
) -> Executable:
...
@overload
def revoke(
privileges: PrivilegesInput,
type: Literal[PgObjectType.FUNCTION],
target: str,
grantee: str,
*,
grant_option: bool = ...,
schema: Optional[str] = ...,
arg_types: ArgTypesInput,
quote_subname: bool = ...,
) -> Executable:
...
[docs]
def revoke(
privileges: PrivilegesInput,
type: PgObjectType,
target: AnyTarget,
grantee: str,
*,
grant_option: bool = False,
schema: Optional[str] = None,
arg_types: Optional[ArgTypesInput] = None,
quote_subname: bool = True,
) -> Executable:
"""REVOKE statement that may be executed by SQLAlchemy.
Parameters:
privileges: List of privileges (or ``'ALL'``).
type: PostgreSQL object type.
target: Object name, or appropriate SQLAlchemy object (e.g.
:class:`~sqlalchemy.schema.Table` or a declarative class).
grantee: Role to lose privileges.
grant_option: Whether to revoke the grant option for these privileges.
schema: Optional schema, if `target` is a string.
arg_types: Sequence of argument types for revoking privileges on
functions. E.g. ``('int4', 'int4')`` or ``()``.
quote_subname: Quote subname identifier in privileges, e.g.
``'SELECT (user)'`` -> ``'SELECT ("user")``. This should
only be ``False`` if the subname is already a valid
identifier.
.. warning:: When ``grant_option=True``, only the grant option is revoked,
not the privilege(s).
.. seealso:: https://www.postgresql.org/docs/current/static/sql-revoke.html
"""
return _Revoke(
privileges,
type,
target,
grantee,
grant_option=grant_option,
schema=schema,
arg_types=arg_types,
quote_subname=quote_subname,
)