#!/usr/bin/env python3 # vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 # # Copyright (C) 2021 Daniel Friesel # # SPDX-License-Identifier: BSD-2-Clause import aiohttp from aiohttp import web import argparse from datetime import datetime import dateutil.parser import geojson from jinja2 import Environment, FileSystemLoader, select_autoescape import logging import json import os import shapely.geometry headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "text/html; charset=utf-8", } ajax_headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "application/json; charset=utf-8", } db_rest_api = os.getenv("NVM_DB_REST_API", "https://v5.db.transport.rest") env = Environment(loader=FileSystemLoader("templates"), autoescape=select_autoescape()) apis = None nvm_version = None class EFA: def __init__(self, url): self.url = url + "/XML_DM_REQUEST" self.post_data = { "command": "", "deleteAssignedStops_dm": "1", "help": "Hilfe", "itdLPxx_id_dm": ":dm", "itdLPxx_mapState_dm": "", "itdLPxx_mdvMap2_dm": "", "itdLPxx_mdvMap_dm": "3406199:401077:NAV3", "itdLPxx_transpCompany": "vrr", "itdLPxx_view": "", "language": "de", "mode": "direct", "nameInfo_dm": "invalid", "nameState_dm": "empty", "outputFormat": "JSON", "ptOptionsActive": "1", "requestID": "0", "reset": "neue Anfrage", "sessionID": "0", "submitButton": "anfordern", "typeInfo_dm": "invalid", "type_dm": "stop", "useProxFootSearch": "0", "useRealtime": "1", } async def get_departures(self, place, name, ts): self.post_data.update( { "itdDateDay": ts.day, "itdDateMonth": ts.month, "itdDateYear": ts.year, "itdTimeHour": ts.hour, "itdTimeMinute": ts.minute, "name_dm": name, } ) if place is None: self.post_data.pop("placeInfo_dm", None) self.post_data.pop("placeState_dm", None) self.post_data.pop("place_dm", None) else: self.post_data.update( {"placeInfo_dm": "invalid", "placeState_dm": "empty", "place_dm": place} ) departures = list() logging.debug(f"Requesting '{place}' '{name}' from {self.url}") async with aiohttp.ClientSession() as session: async with session.post(self.url, data=self.post_data) as response: # EFA may return JSON with a text/html Content-Type, which response.json() does not like. try: departures = json.loads(await response.text()) except json.decoder.JSONDecodeError: raise RuntimeError(response) from None if ( departures is None or not "departureList" in departures or departures["departureList"] is None ): logging.debug(f"EFA response has no departureList") return list() return list(map(EFADeparture, departures["departureList"])) class EFADeparture: def __init__(self, data): self.line = data["servingLine"]["symbol"] self.train_no = data["servingLine"].get("trainNum", None) self.occupancy = data.get("occupancy", None) self.platform = data.get("platform", None) # platformName? self.direction = data["servingLine"].get("direction", None) # Ensure compatibility with DB HAFAS if self.line.startswith("U") and not " " in self.line: self.line = "U " + self.line[1:] datetime = data["dateTime"] year = int(datetime["year"]) month = int(datetime["month"]) day = int(datetime["day"]) hour = int(datetime["hour"]) minute = int(datetime["minute"]) self.iso8601 = f"{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}" def __repr__(self): return f"EFADeparture" class TransportAPIs: def __init__(self): self.apis = list() base = "ext/transport-apis/data/de" for filename in os.listdir(base): with open(f"{base}/{filename}", "r") as f: data = json.load(f) if data["type"].get("efa", False): try: area = data["coverage"]["realtimeCoverage"]["area"] except KeyError: continue # surely there must be a more elegant way to load a JSON sub-dict as GeoJSON area = geojson.loads(json.dumps(area)) self.apis.append((data["options"], shapely.geometry.shape(area))) def get_efa(self, location): location = shapely.geometry.Point(*location) for api, area in self.apis: if area.contains(location): return api return None class Trip: def __init__(self, obj): for key in "departure plannedDeparture arrival plannedArrival".split(): try: obj[key] = dateutil.parser.parse(obj[key]) except TypeError: obj[key] = None self.cancelled = None self.__dict__.update(obj) self.direction_area = None self.direction_stop = None if self.direction is not None and "," in self.direction: self.direction = tuple(self.direction.split(",", maxsplit=1)) else: self.direction = self.direction, None self.stopovers = list(map(Stopover, self.stopovers)) Stopover.resolve_names(self.stopovers) self.where = None self.quoted_where = None self.platform = None self.plannedPlatform = None self.platform_changed = None def set_ref(self, stop_name): self.where = stop_name self.quoted_where = aiohttp.helpers.quote(self.where) ref_pos = -1 for stopover in self.stopovers: if stopover.stop["name"] == stop_name: ref_pos = 0 self.arrival = stopover.arrival self.arrivalDelay = stopover.arrivalDelay self.plannedArrival = stopover.plannedArrival self.departure = stopover.departure self.departureDelay = stopover.departureDelay self.plannedDeparture = stopover.plannedDeparture self.platform = stopover.platform self.plannedPlatform = stopover.plannedPlatform self.platform_changed = stopover.platform_changed elif ref_pos == 0: ref_pos = 1 stopover.set_ref(ref_pos) def set_relative(self, now_ts): for stopover in self.stopovers: stopover.set_relative(now_ts) class Stopover: def __init__(self, obj): for key in "departure plannedDeparture arrival plannedArrival".split(): try: obj[key] = dateutil.parser.parse(obj[key]) except TypeError: obj[key] = None self.cancelled = None self.__dict__.update(obj) self.name_area = None self.name_stop = None if "," in self.stop["name"]: self.name = tuple(self.stop["name"].split(",", maxsplit=1)) else: self.name = self.stop["name"], None if self.arrivalPlatform: self.platform = self.arrivalPlatform elif self.departurePlatform: self.platform = self.departurePlatform else: self.platform = None if self.plannedArrivalPlatform: self.plannedPlatform = self.plannedArrivalPlatform elif self.plannedDeparturePlatform: self.plannedPlatform = self.plannedDeparturePlatform else: self.plannedPlatform = None self.platform_changed = self.platform != self.plannedPlatform self.is_requested_stop = False self.ref_pos = None self.is_future = None def set_ref(self, ref_pos): self.ref_pos = ref_pos if ref_pos <= 0: self.when = self.departure self.plannedWhen = self.plannedDeparture self.delay = self.departureDelay else: self.when = self.arrival self.plannedWhen = self.plannedArrival self.delay = self.arrivalDelay if ref_pos == 0: self.is_requested_stop = True def set_relative(self, now_ts): if self.arrival is not None: self.is_future = self.arrival.timestamp() > now_ts elif self.departure is not None: self.is_future = self.departure.timestamp() > now_ts @staticmethod def resolve_names(stopovers): prefixes = set() suffixes = set() for stopover in stopovers: if stopover.name[1] is not None: prefixes.add(stopover.name[0]) suffixes.add(stopover.name[1]) if len(prefixes) > len(suffixes): area_index, stop_index = 1, 0 else: area_index, stop_index = 0, 1 for stopover in stopovers: if stopover.name[1] is None: stopover.name_area = None stopover.name_stop = stopover.name[0] else: stopover.name_area = stopover.name[area_index] stopover.name_stop = stopover.name[stop_index] class Departure: def __init__(self, obj): self.__dict__.update(obj) if not "cancelled" in obj: self.cancelled = False self.classes = str() self.station_name = None self.stop_name = obj.get("stop", dict()).get("name", None) self.station_name = obj.get("station", dict()).get("name", self.stop_name) try: self.location = ( obj["stop"]["location"]["longitude"], obj["stop"]["location"]["latitude"], ) except KeyError: self.location = None self.direction_area = None self.direction_stop = None if self.direction is not None and "," in self.direction: self.direction = tuple(self.direction.split(",", maxsplit=1)) else: self.direction = self.direction, None if "line" in obj: self.line = Line(self.line) try: self.when = dateutil.parser.parse(self.when) except TypeError: self.when = None try: self.plannedWhen = dateutil.parser.parse(self.plannedWhen) self.iso8601 = self.plannedWhen.strftime("%Y-%m-%dT%H:%M") except TypeError: self.plannedWhen = None self.iso8601 if self.cancelled: self.classes += " cancelled" if self.when: self.sort_by = self.when.timestamp() elif self.plannedWhen: self.sort_by = self.plannedWhen.timestamp() else: self.sort_by = 0 if self.delay: self.delay = self.delay // 60 self.delay = f"{self.delay:+.0f}" self.quoted_line_name = aiohttp.helpers.quote(self.line.name) self.quoted_stop_name = aiohttp.helpers.quote(self.stop_name) def quoted_platform(self): if self.platform: return aiohttp.helpers.quote(self.platform) return "" def __repr__(self): return f"Departure" def set_relative(self, now): minutes = (self.sort_by - now) // 60 if minutes < 1: self.relativeWhen = "sofort" elif minutes < 60: self.relativeWhen = f"{minutes:.0f} min" else: self.relativeWhen = f"{minutes//60:.0f}h {minutes%60:.0f}min" def add_efa(self, candidates): dest_candidates = list() for candidate in candidates: if candidate.iso8601 != self.iso8601: continue if candidate.line != self.line.name: continue dest_candidates.append(candidate) if len(dest_candidates) == 1: self._add_efa(dest_candidates[0]) # else: TODO check destination def _add_efa(self, efa_departure): if efa_departure.platform and not self.platform: self.platform = efa_departure.platform @staticmethod def resolve_directions(departures): prefixes = set() suffixes = set() for departure in departures: if departure.direction[1] is not None: prefixes.add(departure.direction[0]) suffixes.add(departure.direction[1]) if len(prefixes) > len(suffixes): area_index, stop_index = 1, 0 else: area_index, stop_index = 0, 1 for departure in departures: if departure.direction[1] is None: departure.direction_area = None departure.direction_stop = departure.direction[0] else: departure.direction_area = departure.direction[area_index] departure.direction_stop = departure.direction[stop_index] class Line: def __init__(self, obj): self.__dict__.update(obj) self.css_class = str() if self.product.startswith("national"): self.css_class = "longdistance" elif self.product == "tram": # str.removeprefix requires Python ≥ 3.9 if self.name.startswith("STR "): self.name = self.name[4:] self.css_class = "tram" elif self.product == "suburban": self.css_class = "suburban" elif self.product == "subway": self.css_class = "subway" elif self.product == "bus": if self.name.startswith("Bus "): self.name = self.name[4:] elif self.name.startswith("Bus"): self.name = self.name[3:] self.css_class = "bus" def __repr__(self): return self.name def meta_about(request): about = env.get_template("about.html") return web.Response( body=about.render(version=nvm_version), headers=headers, ) def meta_imprint(request): about = env.get_template("imprint.html") return web.Response( body=about.render(version=nvm_version), headers=headers, ) def meta_privacy(request): about = env.get_template("privacy.html") return web.Response( body=about.render(version=nvm_version), headers=headers, ) async def show_trip_info(request, trip_id=None): if trip_id is None: trip_id = request.match_info.get("tripid") request_url = f"{db_rest_api}/trips/{trip_id}?lineName=0" logging.debug(f"Requesting trip info from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: tripinfo = await response.json() tripinfo = Trip(tripinfo) if request.query.get("highlight", None): tripinfo.set_ref(request.query.get("highlight")) if not tripinfo.platform and request.query.get("platform", None): tripinfo.platform = request.query.get("platform") now = datetime.now() now_ts = now.timestamp() tripinfo.set_relative(now_ts) tripinfo_page = env.get_template("tripinfo_page.html") if tripinfo.direction and tripinfo.direction[0] and tripinfo.direction[1]: page_title = tripinfo.line["name"] + " ➔ " + ", ".join(tripinfo.direction) elif tripinfo.direction and tripinfo.direction[0]: page_title = tripinfo.line["name"] + " ➔ " + tripinfo.direction[0] else: page_title = tripinfo.line["name"] return web.Response( body=tripinfo_page.render( title=page_title, tripinfo=tripinfo, version=nvm_version, ), headers=headers, ) async def show_departure_board(request, eva=None): if eva is None: try: eva = int(request.match_info.get("eva")) except ValueError: return web.HTTPBadRequest(text="EVA must be a number at the moment") request_url = f"{db_rest_api}/stops/{eva}/departures?results=60&duration=120&stopovers=true&language=de" logging.debug(f"Requesting '{eva}' departures from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: departures = await response.json() if type(departures) is dict and departures.get("error", False): return web.HTTPNotFound(body=json.dumps(departures), headers=headers) now = datetime.now() now_ts = now.timestamp() departures = list(map(Departure, departures)) Departure.resolve_directions(departures) station_name_freq = dict() for departure in departures: departure.set_relative(now_ts) station_name_freq[departure.station_name] = ( station_name_freq.get(departure.station_name, 0) + 1 ) if station_name_freq: station_name = max(station_name_freq.keys(), key=lambda k: station_name_freq[k]) else: station_name = "NVM" efa_by_iso8601 = dict() warning = None if len(departures) and ", " in station_name: name, place = station_name.split(", ") efa_endpoint = apis.get_efa(departures[0].location) if efa_endpoint: efa = EFA(efa_endpoint["endpoint"]) try: efa_departures = await efa.get_departures(place, name, now) for departure in efa_departures: if departure.iso8601 not in efa_by_iso8601: efa_by_iso8601[departure.iso8601] = list() efa_by_iso8601[departure.iso8601].append(departure) except RuntimeError as e: (response,) = e.args text = await response.text() warning = { "lead": "Detailabfrage fehlgeschlagen:", "body": "Angaben sind möglicherweise unvollständig", "code": f"""EFA server {efa_endpoint["endpoint"]} returned HTTP {response.status} '{text[:10224]}'""", } for departure in departures: departure.add_efa(efa_by_iso8601.get(departure.iso8601, list())) departures = sorted(departures, key=lambda departure: departure.sort_by) if request.query.get("ajax", False): departure_board = env.get_template("departure_list.html") else: departure_board = env.get_template("departures_page.html") return web.Response( body=departure_board.render( title=station_name, departures=departures, warning=warning, version=nvm_version, ), headers=headers, ) async def redirect_to_departure_board(request): stop_name = request.query["name"] request_url = f"{db_rest_api}/locations?query={stop_name}&poi=false&addresses=false" logging.debug(f"Requesting stops matching '{stop_name}' from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: if response.status != 200: error = await response.text() landing_page = env.get_template("landing_page.html") return web.Response( body=landing_page.render( title="NVM", error={ "lead": "Haltestellensuche fehlgeschlagen", "body": "", "code": f"Server returned HTTP {response.status} '{error[:10224]}'", }, version=nvm_version, ), headers=headers, status=500, ) stops = await response.json() for stop in stops: if stop_name.lower() == stop["name"].lower(): return await show_departure_board(request, stop["id"]) stops_page = env.get_template("stops.html") return web.Response( body=stops_page.render( title=f"Suche nach „{stop_name}“", stops=stops, version=nvm_version ), headers=headers, ) async def show_landing_page(request): landing_page = env.get_template("landing_page.html") return web.Response( body=landing_page.render(title="NVM", version=nvm_version), headers=headers, ) async def ajax_geolocation(request): request_data = await request.json() lat = request_data["lat"] lon = request_data["lon"] request_url = ( f"{db_rest_api}/stops/nearby?latitude={lat}&longitude={lon}&results=10" ) logging.debug(f"Requesting stops near {lat}/{lon} from {request_url}") async with aiohttp.ClientSession() as session: async with session.get(request_url) as response: if response.status != 200: text = await response.text() return web.Response( body=json.dumps( { "error": True, "msg": f"HTTP {response.status} '{text[:1024]}'", } ), headers=ajax_headers, status=500, ) departures = await response.json() return web.Response( body=json.dumps(departures), headers=ajax_headers, ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="eva to efa gateway") parser.add_argument("--port", type=int, metavar="PORT", default=8080) parser.add_argument("--prefix", type=str, metavar="PATH", default="/") parser.add_argument( "--log-level", metavar="LEVEL", choices=["debug", "info", "warning", "error"], default="warning", help="Set log level", ) parser.add_argument("--version-str", type=str, metavar="VERSION", default="git") args = parser.parse_args() if args.log_level: numeric_level = getattr(logging, args.log_level.upper(), None) if not isinstance(numeric_level, int): print(f"Invalid log level: {args.log_level}", file=sys.stderr) sys.exit(1) logging.basicConfig(level=numeric_level) apis = TransportAPIs() nvm_version = args.version_str app = web.Application() app.router.add_get(args.prefix, show_landing_page) app.router.add_get(f"{args.prefix}board/{{eva}}", show_departure_board) app.router.add_post(f"{args.prefix}geolocation", ajax_geolocation) app.router.add_get(f"{args.prefix}find/stop", redirect_to_departure_board) app.router.add_get(f"{args.prefix}meta/about", meta_about) app.router.add_get(f"{args.prefix}meta/imprint", meta_imprint) app.router.add_get(f"{args.prefix}meta/privacy", meta_privacy) app.router.add_get(f"{args.prefix}trip/{{tripid}}", show_trip_info) app.router.add_static(f"{args.prefix}static", "static") web.run_app(app, host="localhost", port=args.port)