151 lines
4.6 KiB
Python
Executable File
151 lines
4.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Local tile server for HK aerial map. Translates /tiles/{z}/{x}/{y}.jpg -> tiles/18_{x}_{y}.jpg
|
|
At zoom < 18: composites z=18 tiles into a single output tile.
|
|
At zoom > 18: crops from the parent z=18 tile.
|
|
"""
|
|
|
|
import http.server
|
|
import socketserver
|
|
import os
|
|
import re
|
|
import webbrowser
|
|
import threading
|
|
import io
|
|
from pathlib import Path
|
|
from functools import lru_cache
|
|
|
|
PORT = 8080
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
TRANSPARENT_GIF = (
|
|
b'GIF89a\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff\x00\x00\x00'
|
|
b'!\xf9\x04\x00\x00\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01'
|
|
b'\x00\x00\x02\x02D\x01\x00;'
|
|
)
|
|
|
|
TILE_PATTERN = re.compile(
|
|
r'^/(tiles(?:_annotated(?:_finetuned|_base)?)?)/(\d+)/(\d+)/(\d+)\.(jpg|png|gif)$'
|
|
)
|
|
|
|
try:
|
|
from PIL import Image
|
|
PIL_AVAILABLE = True
|
|
except ImportError:
|
|
PIL_AVAILABLE = False
|
|
print("WARNING: Pillow not installed — zoom out won't show tiles. Run: pip install Pillow")
|
|
|
|
|
|
@lru_cache(maxsize=512)
|
|
def _read_tile(path_str):
|
|
"""Cache tile file reads."""
|
|
path = Path(path_str)
|
|
if path.exists():
|
|
return path.read_bytes()
|
|
return None
|
|
|
|
|
|
def build_tile(layer_path_str, z, x, y):
|
|
"""Return (bytes, content_type) for the requested tile, or (None, None) if empty."""
|
|
layer_path = Path(layer_path_str)
|
|
z, x, y = int(z), int(x), int(y)
|
|
|
|
if z == 18:
|
|
data = _read_tile(str(layer_path / f'18_{x}_{y}.jpg'))
|
|
if data:
|
|
return data, 'image/jpeg'
|
|
return None, None
|
|
|
|
if not PIL_AVAILABLE:
|
|
return None, None
|
|
|
|
if z > 18:
|
|
dz = z - 18
|
|
scale = 1 << dz
|
|
x18 = x >> dz
|
|
y18 = y >> dz
|
|
data = _read_tile(str(layer_path / f'18_{x18}_{y18}.jpg'))
|
|
if not data:
|
|
return None, None
|
|
tile_img = Image.open(io.BytesIO(data))
|
|
tile_w = tile_img.width // scale
|
|
tile_h = tile_img.height // scale
|
|
ox = (x % scale) * tile_w
|
|
oy = (y % scale) * tile_h
|
|
crop = tile_img.crop((ox, oy, ox + tile_w, oy + tile_h))
|
|
out = crop.resize((256, 256), Image.LANCZOS)
|
|
buf = io.BytesIO()
|
|
out.save(buf, format='JPEG', quality=85)
|
|
return buf.getvalue(), 'image/jpeg'
|
|
|
|
# z < 18: composite multiple z=18 tiles
|
|
dz = 18 - z
|
|
scale = 1 << dz # number of z=18 tiles per side
|
|
tile_px = max(1, 256 // scale) # pixels per z=18 tile in output
|
|
|
|
result = None
|
|
for dx in range(scale):
|
|
for dy in range(scale):
|
|
x18 = x * scale + dx
|
|
y18 = y * scale + dy
|
|
data = _read_tile(str(layer_path / f'18_{x18}_{y18}.jpg'))
|
|
if data:
|
|
if result is None:
|
|
result = Image.new('RGB', (256, 256), (0, 0, 0))
|
|
sub = Image.open(io.BytesIO(data))
|
|
if tile_px < 256:
|
|
sub = sub.resize((tile_px, tile_px), Image.LANCZOS)
|
|
result.paste(sub, (dx * tile_px, dy * tile_px))
|
|
|
|
if result is None:
|
|
return None, None
|
|
buf = io.BytesIO()
|
|
result.save(buf, format='JPEG', quality=85)
|
|
return buf.getvalue(), 'image/jpeg'
|
|
|
|
|
|
class Handler(http.server.SimpleHTTPRequestHandler):
|
|
def do_GET(self):
|
|
path = self.path.split('?')[0]
|
|
m = TILE_PATTERN.match(path)
|
|
if m:
|
|
layer, z, x, y, ext = m.groups()
|
|
data, ctype = build_tile(str(SCRIPT_DIR / layer), z, x, y)
|
|
if data:
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', ctype)
|
|
self.send_header('Content-Length', len(data))
|
|
self.send_header('Cache-Control', 'public, max-age=3600')
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
else:
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'image/gif')
|
|
self.send_header('Content-Length', len(TRANSPARENT_GIF))
|
|
self.end_headers()
|
|
self.wfile.write(TRANSPARENT_GIF)
|
|
return
|
|
|
|
super().do_GET()
|
|
|
|
def log_message(self, fmt, *args):
|
|
if not TILE_PATTERN.match(self.path.split('?')[0]):
|
|
print(f' {self.address_string()} {fmt % args}')
|
|
|
|
|
|
def main():
|
|
os.chdir(SCRIPT_DIR)
|
|
|
|
with socketserver.TCPServer(('', PORT), Handler) as httpd:
|
|
httpd.allow_reuse_address = True
|
|
url = f'http://localhost:{PORT}/map.html'
|
|
print(f'HK Aerial Map → {url}')
|
|
print('Press Ctrl+C to stop.\n')
|
|
threading.Timer(0.4, webbrowser.open, args=(url,)).start()
|
|
try:
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print('\nServer stopped.')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|