#!/usr/bin/env python3 # vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 # # Copyright (C) 2021 Daniel Friesel # # SPDX-License-Identifier: BSD-2-Clause import aiohttp from aiohttp import web import argparse from datetime import datetime import dateutil.parser import geojson from jinja2 import Environment, FileSystemLoader, select_autoescape import logging import json import os import shapely.geometry headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8", } ajax_headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "application/json; charset=utf-8", } db_rest_api = os.getenv("NVM_DB_REST_API", "https://v5.db.transport.rest") env = Environment(loader=FileSystemLoader("templates"), autoescape=select_autoescape()) apis = None class EFA: def __init__(self, url): self.url = url + "/XML_DM_REQUEST" self.post_data = { "command": "", "deleteAssignedStops_dm": "1", "help": "Hilfe", "itdLPxx_id_dm": ":dm", "itdLPxx_mapState_dm": "", "itdLPxx_mdvMap2_dm": "", "itdLPxx_mdvMap_dm": "3406199:401077:NAV3", "itdLPxx_transpCompany": "vrr", "itdLPxx_view": "", "language": "de", "mode": "direct", "nameInfo_dm": "invalid", "nameState_dm": "empty", "outputFormat": "JSON", "ptOptionsActive": "1", "requestID": "0", "reset": "neue Anfrage", "sessionID": "0", "submitButton": "anfordern", "typeInfo_dm": "invalid", "type_dm": "stop", "useProxFootSearch": "0", "useRealtime": "1", } async def get_departures(self, place, name, ts): self.post_data.update( { "itdDateDay": ts.day, "itdDateMonth": ts.month, "itdDateYear": ts.year, "itdTimeHour": ts.hour, "itdTimeMinute": ts.minute, "name_dm": name, } ) if place is None: self.post_data.pop("placeInfo_dm", None) self.post_data.pop("placeState_dm", None) self.post_data.pop("place_dm", None) else: self.post_data.update( {"placeInfo_dm": "invalid", "placeState_dm": "empty", "place_dm": place} ) departures = list() logging.debug(f"Requesting '{place}' '{name}' from {self.url}") async with aiohttp.ClientSession() as session: async with session.post(self.url, data=self.post_data) as response: # EFA may return JSON with a text/html Content-Type, which response.json() does not like. try: departures = json.loads(await response.text()) except json.decoder.JSONDecodeError: raise RuntimeError(response) from None if ( departures is None or not "departureList" in departures or departures["departureList"] is None ): logging.debug(f"EFA response has no departureList") return list() return list(map(EFADeparture, departures["departureList"])) class EFADeparture: def __init__(self, data): self.line = data["servingLine"]["symbol"] self.train_no = data["servingLine"].get("trainNum", None) self.occupancy = data.get("occupancy", None) self.platform = data.get("platform", None) # platformName? self.direction = data["servingLine"].get("direction", None) # Ensure compatibility with DB HAFAS if self.line.startswith("U") and not " " in self.line: self.line = "U " + self.line[1:] datetime = data["dateTime"] year = int(datetime["year"]) month = int(datetime["month"]) day = int(datetime["day"]) hour = int(datetime["hour"]) minute = int(datetime["minute"]) self.iso8601 = f"{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}" def __repr__(self): return f"EFADeparture" class TransportAPIs: def __init__(self): self.apis = list() base = "ext/transport-apis/data/de" for filename in os.listdir(base): with open(f"{base}/{filename}", "r") as f: data = json.load(f) if data["type"].get("efa", False): try: area = data["coverage"]["realtimeCoverage"]["area"] except KeyError: continue # surely there must be a more elegant way to load a JSON sub-dict as GeoJSON area = geojson.loads(json.dumps(area)) self.apis.append((data["options"], shapely.geometry.shape(area))) def get_efa(self, location): location = shapely.geometry.Point(*location) for api, area in self.apis: if area.contains(location): return api return None class Departure: def __init__(self, obj): self.__dict__.update(obj) if not "cancelled" in obj: self.cancelled = False self.classes = str() self.station_name = None self.stop_name = obj.get("stop", dict()).get("name", None) self.station_name = obj.get("station", dict()).get("name", self.stop_name) self.quoted_stop_name = aiohttp.helpers.quote(self.stop_name) try: self.location = ( obj["stop"]["location"]["longitude"], obj["stop"]["location"]["latitude"], ) except KeyError: self.location = None if "," in self.direction: self.direction, self.suffix = self.direction.split(",", maxsplit=1) else: self.suffix = None if "line" in obj: self.line = Line(self.line) try: self.when = dateutil.parser.parse(self.when) except TypeError: self.when = None try: self.plannedWhen = dateutil.parser.parse(self.plannedWhen) self.iso8601 = self.plannedWhen.strftime("%Y-%m-%dT%H:%M") except TypeError: self.plannedWhen = None self.iso8601 if self.cancelled: self.classes += " cancelled" if self.when: self.sort_by = self.when.timestamp() elif self.plannedWhen: self.sort_by = self.plannedWhen.timestamp() else: self.sort_by = 0 if self.delay: self.delay = self.delay // 60 self.delay = f"{self.delay:+.0f}" def __repr__(self): return f"Departure" def set_relative(self, now): minutes = (self.sort_by - now) // 60 if minutes < 1: self.relativeWhen = "sofort" elif minutes < 60: self.relativeWhen = f"{minutes:.0f} min" else: self.relativeWhen = f"{minutes//60:.0f}h {minutes%60:.0f}min" def add_efa(self, candidates): dest_candidates = list() for candidate in candidates: if candidate.iso8601 != self.iso8601: continue if candidate.line != self.line.name: continue dest_candidates.append(candidate) if len(dest_candidates) == 1: self._add_efa(dest_candidates[0]) # else: TODO check destination def _add_efa(self, efa_departure): if efa_departure.platform and not self.platform: self.platform = efa_departure.platform class Line: def __init__(self, obj): self.__dict__.update(obj) self.css_class = str() if self.product.startswith("national"): self.css_class = "longdistance" elif self.product == "tram": # str.removeprefix requires Python ≥ 3.9 if self.name.startswith("STR "): self.name = self.name[4:] self.css_class = "tram" elif self.product == "suburban": self.css_class = "suburban" elif self.product == "subway": self.css_class = "subway" elif self.product == "bus": if self.name.startswith("Bus "): self.name = self.name[4:] self.css_class = "bus" def __repr__(self): return self.name async def show_departure_board(request, eva=None): if eva is None: try: eva = int(request.match_info.get("eva")) except ValueError: return web.HTTPBadRequest(text="EVA must be a number at the moment") request_url = ( f"{db_rest_api}/stops/{eva}/departures?results=60&duration=120&stopovers=true" ) logging.debug(f"Requesting '{eva}' departures from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: departures = await response.json() if type(departures) is dict and departures.get("error", False): return web.HTTPNotFound(body=json.dumps(departures), headers=headers) now = datetime.now() now_ts = now.timestamp() departures = list(map(Departure, departures)) station_name_freq = dict() for departure in departures: departure.set_relative(now_ts) station_name_freq[departure.station_name] = ( station_name_freq.get(departure.station_name, 0) + 1 ) if station_name_freq: station_name = max(station_name_freq.keys(), key=lambda k: station_name_freq[k]) else: station_name = "NVM" efa_by_iso8601 = dict() warning = None if len(departures) and ", " in station_name: name, place = station_name.split(", ") efa_endpoint = apis.get_efa(departures[0].location) if efa_endpoint: efa = EFA(efa_endpoint["endpoint"]) try: efa_departures = await efa.get_departures(place, name, now) for departure in efa_departures: if departure.iso8601 not in efa_by_iso8601: efa_by_iso8601[departure.iso8601] = list() efa_by_iso8601[departure.iso8601].append(departure) except RuntimeError as e: (response,) = e.args text = await response.text() warning = { "lead": "Detailabfrage fehlgeschlagen:", "body": "Angaben sind möglicherweise unvollständig", "code": f"""EFA server {efa_endpoint["endpoint"]} returned HTTP {response.status} '{text[:10224]}'""", } for departure in departures: departure.add_efa(efa_by_iso8601.get(departure.iso8601, list())) departures = sorted(departures, key=lambda departure: departure.sort_by) departure_board = env.get_template("departure_list.html") return web.Response( body=departure_board.render( title=station_name, departures=departures, warning=warning ), headers=headers, ) async def redirect_to_departure_board(request): stop_name = request.query["name"] request_url = f"{db_rest_api}/locations?query={stop_name}&poi=false&addresses=false" logging.debug(f"Requesting stops matching '{stop_name}' from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: if response.status != 200: error = await response.text() landing_page = env.get_template("landing_page.html") return web.Response( body=landing_page.render( title="NVM", error={ "lead": "Haltestellensuche fehlgeschlagen", "body": "", "code": f"Server returned HTTP {response.status} '{error[:10224]}'", }, ), headers=headers, status=500, ) stops = await response.json() for stop in stops: if stop_name.lower() == stop["name"].lower(): return await show_departure_board(request, stop["id"]) stops_page = env.get_template("stops.html") return web.Response( body=stops_page.render(title=f"Suche nach „{stop_name}“", stops=stops), headers=headers, ) async def show_landing_page(request): landing_page = env.get_template("landing_page.html") return web.Response( body=landing_page.render(title="NVM"), headers=headers, ) async def ajax_geolocation(request): request_data = await request.json() lat = request_data["lat"] lon = request_data["lon"] request_url = ( f"{db_rest_api}/stops/nearby?latitude={lat}&longitude={lon}&results=10" ) logging.debug(f"Requesting stops near {lat}/{lon} from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: if response.status != 200: text = await response.text() return web.Response( body=json.dumps( { "error": True, "msg": f"HTTP {response.status} '{text[:1024]}'", } ), headers=ajax_headers, status=500, ) departures = await response.json() return web.Response( body=json.dumps(departures), headers=ajax_headers, ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="eva to efa gateway") parser.add_argument("--port", type=int, metavar="PORT", default=8080) parser.add_argument("--prefix", type=str, metavar="PATH", default="/") parser.add_argument( "--log-level", metavar="LEVEL", choices=["debug", "info", "warning", "error"], default="warning", help="Set log level", ) args = parser.parse_args() if args.log_level: numeric_level = getattr(logging, args.log_level.upper(), None) if not isinstance(numeric_level, int): print(f"Invalid log level: {args.log_level}", file=sys.stderr) sys.exit(1) logging.basicConfig(level=numeric_level) apis = TransportAPIs() app = web.Application() app.router.add_get(args.prefix, show_landing_page) app.router.add_get(f"{args.prefix}board/{{eva}}", show_departure_board) app.router.add_post(f"{args.prefix}geolocation", ajax_geolocation) app.router.add_get(f"{args.prefix}find/stop", redirect_to_departure_board) app.router.add_static(f"{args.prefix}static", "static") web.run_app(app, host="localhost", port=args.port)