Note
Go to the end to download the full example code.
Decipher RasterΒΆ
The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.
10 import binascii
11 import struct
12
13 import pytest
14 from sqlalchemy import Column
15 from sqlalchemy import Integer
16 from sqlalchemy import MetaData
17 from sqlalchemy.orm import declarative_base
18
19 from geoalchemy2 import Raster
20 from geoalchemy2 import WKTElement
21
22 # Tests imports
23 from tests import test_only_with_dialects
24
25 metadata = MetaData()
26 Base = declarative_base(metadata=metadata)
27
28
29 class Ocean(Base): # type: ignore
30 __tablename__ = "ocean"
31 id = Column(Integer, primary_key=True)
32 rast = Column(Raster)
33
34 def __init__(self, rast):
35 self.rast = rast
36
37
38 def _format_e(endianness, struct_format):
39 return _ENDIANNESS[endianness] + struct_format
40
41
42 def wkbHeader(raw):
43 # Function to decipher the WKB header
44 # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
45
46 header = {}
47
48 header["endianness"] = struct.unpack("b", raw[0:1])[0]
49
50 e = header["endianness"]
51 header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
52 header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
53 header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
54 header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
55 header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
56 header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
57 header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
58 header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
59 header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
60 header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
61 header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
62
63 return header
64
65
66 def read_band(data, offset, pixtype, height, width, endianness=1):
67 ptype, _, psize = _PTYPE[pixtype]
68 pix_data = data[offset + 1 : offset + 1 + width * height * psize]
69 band = [
70 [
71 struct.unpack(
72 _format_e(endianness, ptype),
73 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
74 )[0]
75 for j in range(width)
76 ]
77 for i in range(height)
78 ]
79 return band
80
81
82 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
83 import numpy as np # noqa
84
85 _, dtype, psize = _PTYPE[pixtype]
86 dt = np.dtype(dtype)
87 dt = dt.newbyteorder(_ENDIANNESS[endianness])
88 band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
89 band = np.reshape(band, ((height, width)))
90 return band
91
92
93 _PTYPE = {
94 0: ["?", "?", 1],
95 1: ["B", "B", 1],
96 2: ["B", "B", 1],
97 3: ["b", "b", 1],
98 4: ["B", "B", 1],
99 5: ["h", "i2", 2],
100 6: ["H", "u2", 2],
101 7: ["i", "i4", 4],
102 8: ["I", "u4", 4],
103 10: ["f", "f4", 4],
104 11: ["d", "f8", 8],
105 }
106
107 _ENDIANNESS = {
108 0: ">",
109 1: "<",
110 }
111
112
113 def wkbImage(raster_data, use_numpy=False):
114 """Function to decipher the WKB raster data"""
115 # Get binary data
116 raw = binascii.unhexlify(raster_data)
117
118 # Read header
119 h = wkbHeader(bytes(raw))
120 e = h["endianness"]
121
122 img = [] # array to store image bands
123 offset = 61 # header raw length in bytes
124 band_size = h["width"] * h["height"] # number of pixels in each band
125
126 for _ in range(h["nbands"]):
127 # Determine pixtype for this band
128 pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
129
130 # Read data with either pure Python or Numpy
131 if use_numpy:
132 band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
133 else:
134 band = read_band(raw, offset, pixtype, h["height"], h["width"])
135
136 # Store the result
137 img.append(band)
138 offset = offset + 2 + band_size
139
140 return img
141
142
143 @test_only_with_dialects("postgresql")
144 class TestDecipherRaster:
145 @pytest.mark.parametrize(
146 "pixel_type",
147 [
148 "1BB",
149 "2BUI",
150 "4BUI",
151 "8BSI",
152 "8BUI",
153 "16BSI",
154 "16BUI",
155 "32BSI",
156 "32BUI",
157 "32BF",
158 "64BF",
159 ],
160 )
161 def test_decipher_raster(self, pixel_type, session, conn):
162 """Create a raster and decipher it"""
163 metadata.drop_all(conn, checkfirst=True)
164 metadata.create_all(conn)
165
166 # Create a new raster
167 polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
168 o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
169 session.add(o)
170 session.flush()
171
172 # Decipher data from each raster
173 image = wkbImage(o.rast.data)
174
175 # Define expected result
176 expected = [
177 [0, 1, 1, 1, 1],
178 [1, 1, 1, 1, 1],
179 [0, 1, 1, 1, 0],
180 [0, 1, 1, 0, 0],
181 [0, 1, 0, 0, 0],
182 [0, 0, 0, 0, 0],
183 ]
184
185 # Check results
186 band = image[0]
187 assert band == expected