Decipher RasterΒΆ

The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.

 10 import binascii
 11 import struct
 12
 13 import pytest
 14 from sqlalchemy import Column
 15 from sqlalchemy import Integer
 16 from sqlalchemy import MetaData
 17 from sqlalchemy.orm import declarative_base
 18
 19 from geoalchemy2 import Raster
 20 from geoalchemy2 import WKTElement
 21
 22 # Tests imports
 23 from tests import test_only_with_dialects
 24
 25 metadata = MetaData()
 26 Base = declarative_base(metadata=metadata)
 27
 28
 29 class Ocean(Base):  # type: ignore
 30     __tablename__ = "ocean"
 31     id = Column(Integer, primary_key=True)
 32     rast = Column(Raster)
 33
 34     def __init__(self, rast):
 35         self.rast = rast
 36
 37
 38 def _format_e(endianness, struct_format):
 39     return _ENDIANNESS[endianness] + struct_format
 40
 41
 42 def wkbHeader(raw):
 43     # Function to decipher the WKB header
 44     # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
 45
 46     header = {}
 47
 48     header["endianness"] = struct.unpack("b", raw[0:1])[0]
 49
 50     e = header["endianness"]
 51     header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
 52     header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
 53     header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
 54     header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
 55     header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
 56     header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
 57     header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
 58     header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
 59     header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
 60     header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
 61     header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
 62
 63     return header
 64
 65
 66 def read_band(data, offset, pixtype, height, width, endianness=1):
 67     ptype, _, psize = _PTYPE[pixtype]
 68     pix_data = data[offset + 1 : offset + 1 + width * height * psize]
 69     band = [
 70         [
 71             struct.unpack(
 72                 _format_e(endianness, ptype),
 73                 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
 74             )[0]
 75             for j in range(width)
 76         ]
 77         for i in range(height)
 78     ]
 79     return band
 80
 81
 82 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
 83     import numpy as np  # noqa
 84
 85     _, dtype, psize = _PTYPE[pixtype]
 86     dt = np.dtype(dtype)
 87     dt = dt.newbyteorder(_ENDIANNESS[endianness])
 88     band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
 89     band = np.reshape(band, ((height, width)))
 90     return band
 91
 92
 93 _PTYPE = {
 94     0: ["?", "?", 1],
 95     1: ["B", "B", 1],
 96     2: ["B", "B", 1],
 97     3: ["b", "b", 1],
 98     4: ["B", "B", 1],
 99     5: ["h", "i2", 2],
100     6: ["H", "u2", 2],
101     7: ["i", "i4", 4],
102     8: ["I", "u4", 4],
103     10: ["f", "f4", 4],
104     11: ["d", "f8", 8],
105 }
106
107 _ENDIANNESS = {
108     0: ">",
109     1: "<",
110 }
111
112
113 def wkbImage(raster_data, use_numpy=False):
114     """Function to decipher the WKB raster data"""
115     # Get binary data
116     raw = binascii.unhexlify(raster_data)
117
118     # Read header
119     h = wkbHeader(bytes(raw))
120     e = h["endianness"]
121
122     img = []  # array to store image bands
123     offset = 61  # header raw length in bytes
124     band_size = h["width"] * h["height"]  # number of pixels in each band
125
126     for _ in range(h["nbands"]):
127         # Determine pixtype for this band
128         pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
129
130         # Read data with either pure Python or Numpy
131         if use_numpy:
132             band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
133         else:
134             band = read_band(raw, offset, pixtype, h["height"], h["width"])
135
136         # Store the result
137         img.append(band)
138         offset = offset + 2 + band_size
139
140     return img
141
142
143 @test_only_with_dialects("postgresql")
144 class TestDecipherRaster:
145     @pytest.mark.parametrize(
146         "pixel_type",
147         [
148             "1BB",
149             "2BUI",
150             "4BUI",
151             "8BSI",
152             "8BUI",
153             "16BSI",
154             "16BUI",
155             "32BSI",
156             "32BUI",
157             "32BF",
158             "64BF",
159         ],
160     )
161     def test_decipher_raster(self, pixel_type, session, conn):
162         """Create a raster and decipher it"""
163         metadata.drop_all(conn, checkfirst=True)
164         metadata.create_all(conn)
165
166         # Create a new raster
167         polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
168         o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
169         session.add(o)
170         session.flush()
171
172         # Decipher data from each raster
173         image = wkbImage(o.rast.data)
174
175         # Define expected result
176         expected = [
177             [0, 1, 1, 1, 1],
178             [1, 1, 1, 1, 1],
179             [0, 1, 1, 1, 0],
180             [0, 1, 1, 0, 0],
181             [0, 1, 0, 0, 0],
182             [0, 0, 0, 0, 0],
183         ]
184
185         # Check results
186         band = image[0]
187         assert band == expected

Gallery generated by Sphinx-Gallery