#!/usr/bin/env python3 # vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160 # # Copyright (C) 2021 Birte Kristina Friesel # # SPDX-License-Identifier: BSD-2-Clause import argparse import aiohttp from aiohttp import web from datetime import datetime import json headers = { "Access-Control-Allow-Origin": "*", "Content-Type": "application/json; charset=utf-8", } occupancy_map = {"MANY_SEATS": 1, "FEW_SEATS": 2, "STANDING_ONLY": 3} occupancy_cache = dict() eva_to_name = dict() class EFA: def __init__(self, url, proximity_search=False): self.dm_url = url + "/XML_DM_REQUEST" self.dm_post_data = { "command": "", "deleteAssignedStops_dm": "1", "help": "Hilfe", "itdLPxx_id_dm": ":dm", "itdLPxx_mapState_dm": "", "itdLPxx_mdvMap2_dm": "", "itdLPxx_mdvMap_dm": "3406199:401077:NAV3", "itdLPxx_transpCompany": "vrr", "itdLPxx_view": "", "language": "de", "mode": "direct", "nameInfo_dm": "invalid", "nameState_dm": "empty", "outputFormat": "JSON", "ptOptionsActive": "1", "requestID": "0", "reset": "neue Anfrage", "sessionID": "0", "submitButton": "anfordern", "typeInfo_dm": "invalid", "type_dm": "stop", "useProxFootSearch": "0", "useRealtime": "1", } if proximity_search: self.dm_post_data["useProxFootSearch"] = "1" async def get_departures(self, place, name, ts): self.dm_post_data.update( { "itdDateDay": ts.day, "itdDateMonth": ts.month, "itdDateYear": ts.year, "itdTimeHour": ts.hour, "itdTimeMinute": ts.minute, "name_dm": name, } ) if place is None: self.dm_post_data.pop("placeInfo_dm", None) self.dm_post_data.pop("placeState_dm", None) self.dm_post_data.pop("place_dm", None) else: self.dm_post_data.update( {"placeInfo_dm": "invalid", "placeState_dm": "empty", "place_dm": place} ) departures = list() async with aiohttp.ClientSession() as session: async with session.post(self.dm_url, data=self.dm_post_data) as response: # EFA may return JSON with a text/html Content-Type, which response.json() does not like. departures = json.loads(await response.text()) return departures async def get_departures(request): efa_url = request.query.get("url", None) place = request.query.get("place", None) name = request.query.get("name", None) if not efa_url: return web.HTTPBadRequest(text="missing url=... parameter") if not name: return web.HTTPBadRequest(text="missing name=... parameter") now = datetime.now() reply = await EFA(efa_url).get_departures(place, name, now) return web.Response(body=json.dumps(reply), headers=headers) def get_occupancy(occupancy): try: return occupancy_map[occupancy] except KeyError: return None def build_occupancy_response(departure_list, eva, now): train_data = dict() for departure in departure_list: try: occupancy = departure["occupancy"] train_no = departure["servingLine"]["trainNum"] train_data[train_no] = {"occupancy": get_occupancy(occupancy)} except KeyError: pass reply = {"train": train_data} occupancy_cache[eva] = (now, reply) return web.Response(body=json.dumps(reply), headers=headers) async def get_occupancy_by_eva(request): try: eva = int(request.match_info.get("eva")) except ValueError: return web.HTTPBadRequest(text="EVA must be a number") try: station, optid = eva_to_name[eva] except KeyError: return web.HTTPNotFound(text="Unknown EVA") now = datetime.now() if eva in occupancy_cache and now.timestamp() - occupancy_cache[eva][0] < 300: return web.Response(body=json.dumps(occupancy_cache[eva][1]), headers=headers) reply = await EFA("https://app.vrr.de/standard").get_departures(None, station, now) if reply.get("departureList", None): return build_occupancy_response(reply["departureList"], eva, now.timestamp()) reply = await EFA( "https://app.vrr.de/standard", proximity_search=True ).get_departures(None, optid, now) return build_occupancy_response( reply.get("departureList", list()), eva, now.timestamp() ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="eva to efa gateway") parser.add_argument("--port", type=int, metavar="PORT", default=8080) parser.add_argument("--prefix", type=str, metavar="PATH", default="/") args = parser.parse_args() with open("share/vrr.json", "r") as f: for eva, (name, optid) in json.load(f).items(): eva_to_name[int(eva)] = name, optid app = web.Application() # legacy route app.add_routes([web.get(f"{args.prefix}{{eva}}.json", get_occupancy_by_eva)]) app.add_routes([web.get(f"{args.prefix}departures", get_departures)]) app.add_routes( [web.get(f"{args.prefix}occupancy-by-eva/{{eva}}.json", get_occupancy_by_eva)] ) web.run_app(app, host="localhost", port=args.port)