#!/usr/bin/env python3 # vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 # # Copyright (C) 2021 Daniel Friesel # # SPDX-License-Identifier: BSD-2-Clause import argparse import aiohttp from aiohttp import web from datetime import datetime import dateutil.parser from jinja2 import Environment, FileSystemLoader, select_autoescape import geojson import json import os import shapely.geometry headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8", } ajax_headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "application/json; charset=utf-8", } db_rest_api = os.getenv("NVM_DB_REST_API", "https://v5.db.transport.rest") env = Environment(loader=FileSystemLoader("templates"), autoescape=select_autoescape()) apis = None class TransportAPIs: def __init__(self): self.apis = list() base = "ext/transport-apis/data/de" for filename in os.listdir(base): with open(f"{base}/{filename}", "r") as f: data = json.load(f) if data["type"].get("efa", False): try: area = data["coverage"]["realtimeCoverage"]["area"] except KeyError: continue # surely there must be a more elegant way to load a JSON sub-dict as GeoJSON area = geojson.loads(json.dumps(area)) self.apis.append((data["options"], shapely.geometry.shape(area))) def get_efa(self, location): location = shapely.geometry.Point(*location) for api, area in self.apis: if area.contains(location): return api return None class Departure: def __init__(self, obj): self.__dict__.update(obj) if not "cancelled" in obj: self.cancelled = False self.classes = str() self.station_name = None self.stop_name = obj.get("stop", dict()).get("name", None) self.station_name = obj.get("station", dict()).get("name", self.stop_name) try: self.location = ( obj["stop"]["location"]["longitude"], obj["stop"]["location"]["latitude"], ) except KeyError: self.location = None if "," in self.direction: self.direction, self.suffix = self.direction.split(",", maxsplit=1) else: self.suffix = None if "line" in obj: self.line = Line(self.line) try: self.when = dateutil.parser.parse(self.when) except TypeError: self.when = None try: self.plannedWhen = dateutil.parser.parse(self.plannedWhen) except TypeError: self.plannedWhen = None if self.cancelled: self.classes += " cancelled" if self.when: self.sort_by = self.when.timestamp() elif self.plannedWhen: self.sort_by = self.plannedWhen.timestamp() else: self.sort_by = 0 if self.delay: self.delay = self.delay // 60 self.delay = f"{self.delay:+.0f}" def set_relative(self, now): minutes = (self.sort_by - now) // 60 if minutes < 1: self.relativeWhen = "sofort" elif minutes < 60: self.relativeWhen = f"{minutes:.0f} min" else: self.relativeWhen = f"{minutes//60:.0f}h {minutes%60:.0f}min" class Line: def __init__(self, obj): self.__dict__.update(obj) self.css_class = str() if self.product.startswith("national"): self.css_class = "longdistance" elif self.product == "tram": # str.removeprefix requires Python ≥ 3.9 if self.name.startswith("STR "): self.name = self.name[4:] self.css_class = "tram" elif self.product == "suburban": self.css_class = "suburban" elif self.product == "subway": self.css_class = "subway" elif self.product == "bus": if self.name.startswith("Bus "): self.name = self.name[4:] self.css_class = "bus" async def show_departure_board(request): try: eva = int(request.match_info.get("eva")) except ValueError: return web.HTTPBadRequest(text="EVA must be a number at the moment") async with aiohttp.ClientSession() as session: async with session.get( f"{db_rest_api}/stops/{eva}/departures?results=60&duration=120&stopovers=true" ) as response: departures = await response.json() if type(departures) is dict and departures.get("error", False): return web.HTTPNotFound(body=json.dumps(departures), headers=headers) departures = list(map(Departure, departures)) if len(departures): efa_endpoint = apis.get_efa(departures[0].location) station_name_freq = dict() now = datetime.now().timestamp() for departure in departures: departure.set_relative(now) station_name_freq[departure.station_name] = ( station_name_freq.get(departure.station_name, 0) + 1 ) if station_name_freq: station_name = max(station_name_freq.keys(), key=lambda k: station_name_freq[k]) else: station_name = "NVM" departure_board = env.get_template("departure_list.html") return web.Response( body=departure_board.render(title=station_name, departures=departures), headers=headers, ) async def redirect_to_departure_board(request): stop_name = request.query["name"] async with aiohttp.ClientSession() as session: async with session.get( f"{db_rest_api}/locations?query={stop_name}&poi=false&addresses=false" ) as response: stops = await response.json() stops_page = env.get_template("stops.html") return web.Response( body=stops_page.render(title=f"Suche nach „{stop_name}“", stops=stops), headers=headers, ) async def show_landing_page(request): landing_page = env.get_template("landing_page.html") return web.Response( body=landing_page.render(title="NVM"), headers=headers, ) async def ajax_geolocation(request): request_data = await request.json() lat = request_data["lat"] lon = request_data["lon"] async with aiohttp.ClientSession() as session: async with session.get( f"{db_rest_api}/stops/nearby?latitude={lat}&longitude={lon}" ) as response: departures = await response.json() return web.Response( body=json.dumps(departures), headers=ajax_headers, ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="eva to efa gateway") parser.add_argument("--port", type=int, metavar="PORT", default=8080) parser.add_argument("--prefix", type=str, metavar="PATH", default="/") args = parser.parse_args() apis = TransportAPIs() app = web.Application() app.router.add_get(args.prefix, show_landing_page) app.router.add_get(f"{args.prefix}board/{{eva}}", show_departure_board) app.router.add_post(f"{args.prefix}geolocation", ajax_geolocation) app.router.add_get(f"{args.prefix}find/stop", redirect_to_departure_board) app.router.add_static(f"{args.prefix}static", "static") web.run_app(app, host="localhost", port=args.port)