1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#!/usr/bin/env python3
# vim:tabstop=4 softtabstop=4 shiftwidth=4 textwidth=160 smarttab expandtab colorcolumn=160
import argparse
import aiohttp
from aiohttp import web
import json
headers = {
"Access-Control-Allow-Origin": "*",
"Content-Type": "application/json; charset=utf-8",
}
occupancy_map = {"MANY_SEATS": 1, "FEW_SEATS": 2, "STANDING_ONLY": 3}
eva_to_name = dict()
def get_occupancy(occupancy):
try:
return occupancy_map[occupancy]
except KeyError:
return None
async def handle_eva(request):
try:
eva = int(request.match_info.get("eva"))
except ValueError:
return web.HTTPBadRequest(text="EVA must be a number")
try:
station = eva_to_name[eva]
except KeyError:
return web.HTTPNotFound(text="Unknown EVA")
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://vrrf.finalrewind.org/{station}.json?line=RE,RB,S&backend=efa.VRR2"
) as response:
content = await response.text()
content = json.loads(content)
train_data = dict()
for train in content["raw"]:
if train["train_no"]:
train_data[train["train_no"]] = {
"occupancy": get_occupancy(train["occupancy"])
}
reply = {"train": train_data}
return web.Response(body=json.dumps(reply), headers=headers)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="eva to efa gateway")
parser.add_argument(
"--eva-csv", type=str, metavar="FILE", default="haltestellenMitEva.csv"
)
parser.add_argument("--port", type=int, metavar="PORT", default=8080)
parser.add_argument("--prefix", type=str, metavar="PATH", default="/")
args = parser.parse_args()
with open("share/vrr.json", "r") as f:
for eva, (city, stop, name) in json.load(f).items():
eva_to_name[int(eva)] = name
app = web.Application()
app.add_routes([web.get(f"{args.prefix}{{eva}}.json", handle_eva)])
web.run_app(app, host="localhost", port=args.port)
|