from __future__ import annotations
import binascii
import re
import struct
from typing import Any
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import functions
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.types import to_instance
from geoalchemy2.exc import ArgumentError
BinasciiError = binascii.Error
function_registry: set[str] = set()
[docs]
class _SpatialElement:
"""The base class for public spatial elements.
Args:
data: The first argument passed to the constructor is the data wrapped
by the ``_SpatialElement`` object being constructed.
srid: An integer representing the spatial reference system. E.g. ``4326``.
Default value is ``-1``, which means no/unknown reference system.
extended: A boolean indicating whether the extended format (EWKT or EWKB)
is used. Default is ``None``.
"""
# Define __slots__ to restrict attributes in this class.
# This is done intentionally to improve performance by preventing
# the creation of a dynamic __dict__ for each instance.
__slots__ = ("srid", "data", "extended")
def __init__(self, data, srid: int = -1, extended: bool | None = None) -> None:
self.srid = srid
self.data = data
self.extended = extended
def __str__(self) -> str:
return self.desc
def __repr__(self) -> str:
return f"<{self.__class__.__name__} at 0x{id(self):x}; {self}>" # pragma: no cover
def __eq__(self, other) -> bool:
try:
return (
self.extended == other.extended
and self.srid == other.srid
and self.desc == other.desc
)
except AttributeError:
return False
def __ne__(self, other) -> bool:
return not self.__eq__(other)
def __hash__(self):
return hash((self.desc, self.srid, self.extended))
def __getattr__(self, name):
#
# This is how things like lake.geom.ST_Buffer(2) creates
# SQL expressions of this form:
#
# ST_Buffer(ST_GeomFromWKB(:ST_GeomFromWKB_1), :param_1)
#
# Raise an AttributeError when the attribute name doesn't start
# with st_. This is to be nice with other libraries that use
# some ducktyping (e.g. hasattr(element, "copy")) to determine
# the type of the element.
if name.lower() not in function_registry:
raise AttributeError
# We create our own _FunctionGenerator here, and use it in place of
# SQLAlchemy's "func" object. This is to be able to "bind" the
# function to the SQL expression. See also GenericFunction above.
func_ = functions._FunctionGenerator(expr=self)
return getattr(func_, name)
def __getstate__(self) -> dict[str, Any]:
state = {
"srid": self.srid,
"data": str(self),
"extended": self.extended,
}
return state
def __setstate__(self, state: dict[str, Any]) -> None:
self.srid = state["srid"]
self.extended = state["extended"]
self.data = self._data_from_desc(state["data"])
@staticmethod
def _data_from_desc(desc):
raise NotImplementedError() # pragma: no cover
[docs]
class WKTElement(_SpatialElement):
"""Instances of this class wrap a WKT or EWKT value.
Usage examples::
wkt_element_1 = WKTElement('POINT(5 45)')
wkt_element_2 = WKTElement('POINT(5 45)', srid=4326)
wkt_element_3 = WKTElement('SRID=4326;POINT(5 45)', extended=True)
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicWKTElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
_REMOVE_SRID = re.compile("(SRID=([0-9]+); ?)?(.*)")
SPLIT_WKT_PATTERN = re.compile(r"((SRID=\d+) *; *)?([\w ]+) *(\([-\d\. ,\(\)eE]+\))")
geom_from: str = "ST_GeomFromText"
geom_from_extended_version: str = "ST_GeomFromEWKT"
def __init__(self, data: str, srid: int = -1, extended: bool | None = None) -> None:
if extended is None:
extended = data.startswith("SRID=")
if extended and srid == -1:
# read srid from EWKT
data_s = data.split(";")
if len(data_s) != 2:
raise ArgumentError(f"invalid EWKT string {data}")
header = data_s[0]
try:
srid = int(header[5:])
except ValueError:
raise ArgumentError(f"invalid EWKT string {data}") from None
_SpatialElement.__init__(self, data, srid, extended)
@property
def desc(self) -> str:
"""This element's description string."""
return self.data
@staticmethod
def _data_from_desc(desc):
return desc
[docs]
def as_wkt(self) -> WKTElement:
if self.extended:
srid_match = self._REMOVE_SRID.match(self.data)
assert srid_match is not None
return WKTElement(srid_match.group(3), self.srid, extended=False)
return WKTElement(self.data, self.srid, self.extended)
[docs]
def as_ewkt(self) -> WKTElement:
if not self.extended and self.srid != -1:
data = f"SRID={self.srid};" + self.data
return WKTElement(data, extended=True)
return WKTElement(self.data, self.srid, self.extended)
class DynamicWKTElement(WKTElement):
"""This is a subclass of ``WKTElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class WKBElement(_SpatialElement):
"""Instances of this class wrap a WKB or EWKB value.
Geometry values read from the database are converted to instances of this
type. In most cases you won't need to create ``WKBElement`` instances
yourself.
If ``extended`` is ``True`` and ``srid`` is ``-1`` at construction time
then the SRID will be read from the EWKB data.
Note: you can create ``WKBElement`` objects from Shapely geometries
using the :func:`geoalchemy2.shape.from_shape` function.
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicWKBElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
geom_from: str = "ST_GeomFromWKB"
geom_from_extended_version: str = "ST_GeomFromEWKB"
def __init__(
self, data: str | bytes | memoryview, srid: int = -1, extended: bool | None = None
) -> None:
if srid == -1 or extended is None or extended:
# read srid from the EWKB
#
# WKB struct {
# byte byteOrder;
# uint32 wkbType;
# uint32 SRID;
# struct geometry;
# }
# byteOrder enum {
# WKB_XDR = 0, // Most Significant Byte First
# WKB_NDR = 1, // Least Significant Byte First
# }
# See https://trac.osgeo.org/postgis/browser/branches/3.0/doc/ZMSgeoms.txt
# for more details about WKB/EWKB specifications.
# SpatiaLite case: assume that the string is an hex value
header = binascii.unhexlify(data[:18]) if isinstance(data, str) else data[:9]
byte_order, wkb_type, wkb_srid = header[0], header[1:5], header[5:]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = (
int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0
)
if extended is None:
# Check SRID bit
extended = False if not wkb_type_int else extended or bool(wkb_type_int & 536870912)
if extended and srid == -1:
wkb_srid = struct.unpack(byte_order_marker, wkb_srid)[0]
srid = int(wkb_srid)
_SpatialElement.__init__(self, data, srid, extended)
@staticmethod
def _wkb_to_hex(data: str | bytes | memoryview) -> str:
"""Convert WKB to hex string."""
if isinstance(data, str):
# SpatiaLite case
return data.lower()
return str(binascii.hexlify(data), encoding="utf-8").lower()
@property
def desc(self) -> str:
"""This element's description string."""
return self._wkb_to_hex(self.data)
@staticmethod
def _data_from_desc(desc) -> bytes:
desc = desc.encode(encoding="utf-8")
return binascii.unhexlify(desc)
[docs]
def as_wkb(self) -> WKBElement:
if self.extended:
if isinstance(self.data, str):
# SpatiaLite case
# assume that the string is an hex value
is_hex = True
header = binascii.unhexlify(self.data[:10])
byte_order, wkb_type = header[0], header[1:5]
else:
is_hex = False
byte_order, wkb_type = self.data[0], self.data[1:5]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = (
int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0
)
wkb_type_int &= 3758096383 # Set SRID bit to 0 and keep all other bits
if is_hex:
wkb_type_hex = binascii.hexlify(
wkb_type_int.to_bytes(4, "little" if byte_order else "big")
)
data = self.data[:2] + wkb_type_hex.decode("ascii") + self.data[18:]
else:
buffer = bytearray()
buffer.extend(self.data[:1])
buffer.extend(struct.pack(byte_order_marker, wkb_type_int))
buffer.extend(self.data[9:])
data = memoryview(buffer)
return WKBElement(data, self.srid, extended=False)
return WKBElement(self.data, self.srid)
[docs]
def as_ewkb(self) -> WKBElement:
if not self.extended and self.srid != -1:
if isinstance(self.data, str):
# SpatiaLite case
# assume that the string is an hex value
header = binascii.unhexlify(self.data[:10])
byte_order, wkb_type = header[0], header[1:5]
else:
byte_order, wkb_type = self.data[0], self.data[1:5]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = int(
struct.unpack(byte_order_marker, wkb_type)[0] if len(wkb_type) == 4 else 0
)
wkb_type_int |= 536870912 # Set SRID bit to 1 and keep all other bits
data: str | memoryview
if isinstance(self.data, str):
wkb_type_hex = binascii.hexlify(
wkb_type_int.to_bytes(4, "little" if byte_order else "big")
)
wkb_srid_hex = binascii.hexlify(
self.srid.to_bytes(4, "little" if byte_order else "big")
)
data = (
self.data[:2]
+ wkb_type_hex.decode("ascii")
+ wkb_srid_hex.decode("ascii")
+ self.data[10:]
)
else:
buffer = bytearray()
buffer.extend(self.data[:1])
buffer.extend(struct.pack(byte_order_marker, wkb_type_int))
buffer.extend(struct.pack(byte_order_marker, self.srid))
buffer.extend(self.data[5:])
data = memoryview(buffer)
return WKBElement(data, self.srid, extended=True)
return WKBElement(self.data, self.srid)
class DynamicWKBElement(WKBElement):
"""This is a subclass of ``WKBElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class RasterElement(_SpatialElement):
"""Instances of this class wrap a ``raster`` value.
Raster values read from the database are converted to instances of this type. In
most cases you won't need to create ``RasterElement`` instances yourself.
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicRasterElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
geom_from_extended_version: str = "raster"
def __init__(self, data: str | bytes | memoryview) -> None:
# read srid from the WKB (binary or hexadecimal format)
# The WKB structure is documented in the file
# raster/doc/RFC2-WellKnownBinaryFormat of the PostGIS sources.
bin_data: str | bytes | memoryview
try:
bin_data = binascii.unhexlify(data[:114])
except BinasciiError:
bin_data = data
data = str(binascii.hexlify(data).decode(encoding="utf-8")) # type: ignore
byte_order = bin_data[0]
srid = bin_data[53:57]
srid = struct.unpack("<I" if byte_order else ">I", srid)[0] # type: ignore
_SpatialElement.__init__(self, data, int(srid), True)
@property
def desc(self) -> str:
"""This element's description string."""
return self.data
@staticmethod
def _data_from_desc(desc):
return desc
class DynamicRasterElement(RasterElement):
"""This is a subclass of ``RasterElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class CompositeElement(FunctionElement):
"""Instances of this class wrap a Postgres composite type."""
__slots__ = ("name", "type")
inherit_cache: bool = False
"""The cache is disabled for this class."""
def __init__(self, base, field, type_) -> None:
self.name = field
self.type = to_instance(type_)
super().__init__(base)
@compiles(CompositeElement)
def _compile_pgelem(expr, compiler, **kw) -> str:
return f"({compiler.process(expr.clauses, **kw)}).{expr.name}"
__all__: list[str] = [
"_SpatialElement",
"CompositeElement",
"RasterElement",
"WKBElement",
"WKTElement",
"DynamicRasterElement",
"DynamicWKBElement",
"DynamicWKTElement",
]
def __dir__() -> list[str]:
return __all__