1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
#!/usr/bin/env python3
# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160
#
# Copyright (C) 2021 Daniel Friesel
#
# SPDX-License-Identifier: BSD-2-Clause
import argparse
import aiohttp
from aiohttp import web
from datetime import datetime
import json
headers = {
"Access-Control-Allow-Origin": "*",
"Content-Type": "application/json; charset=utf-8",
}
occupancy_map = {"MANY_SEATS": 1, "FEW_SEATS": 2, "STANDING_ONLY": 3}
occupancy_cache = dict()
eva_to_name = dict()
def get_occupancy(occupancy):
try:
return occupancy_map[occupancy]
except KeyError:
return None
def build_occupancy_response(content, eva, now):
train_data = dict()
for train in content["raw"]:
if train["train_no"]:
train_data[train["train_no"]] = {
"occupancy": get_occupancy(train["occupancy"])
}
reply = {"train": train_data}
occupancy_cache[eva] = (now, reply)
return web.Response(body=json.dumps(reply), headers=headers)
async def get_occupancy_by_eva(request):
try:
eva = int(request.match_info.get("eva"))
except ValueError:
return web.HTTPBadRequest(text="EVA must be a number")
try:
station, optid = eva_to_name[eva]
except KeyError:
return web.HTTPNotFound(text="Unknown EVA")
now = datetime.now().timestamp()
if eva in occupancy_cache and now - occupancy_cache[eva][0] < 300:
return web.Response(body=json.dumps(occupancy_cache[eva][1]), headers=headers)
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://vrrf.finalrewind.org/{station}.json?line=RE,RB,S&backend=efa.VRR2"
) as response:
content = await response.json()
if not content["error"]:
return build_occupancy_response(content, eva, now)
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://vrrf.finalrewind.org/{optid}.json?line=RE,RB,S&backend=efa.VRR2&proximity_search=1"
) as response:
content = await response.json()
return build_occupancy_response(content, eva, now)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="eva to efa gateway")
parser.add_argument("--port", type=int, metavar="PORT", default=8080)
parser.add_argument("--prefix", type=str, metavar="PATH", default="/")
args = parser.parse_args()
with open("share/vrr.json", "r") as f:
for eva, (name, optid) in json.load(f).items():
eva_to_name[int(eva)] = name, optid
app = web.Application()
app.add_routes(
[web.get(f"{args.prefix}occupancy-by-eva/{{eva}}.json", get_occupancy_by_eva)]
)
web.run_app(app, host="localhost", port=args.port)
|