summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBirte Kristina Friesel <derf@finalrewind.org>2025-06-14 19:12:31 +0200
committerBirte Kristina Friesel <derf@finalrewind.org>2025-06-14 19:12:31 +0200
commitd47ff9615b551bbd844a799be7717e9e74a04266 (patch)
tree030dcceb9987123adce507df24651f0df69680ca
parent3a955c0105bf13d040a821e2c87a19694202cde6 (diff)
worker: add EFA support
-rwxr-xr-xlib/Travelynx.pm1
-rw-r--r--lib/Travelynx/Command/work.pm79
-rw-r--r--lib/Travelynx/Model/InTransit.pm102
3 files changed, 181 insertions, 1 deletions
diff --git a/lib/Travelynx.pm b/lib/Travelynx.pm
index 6ae0135..3d892ec 100755
--- a/lib/Travelynx.pm
+++ b/lib/Travelynx.pm
@@ -939,6 +939,7 @@ sub startup {
db => $db,
journey => $journey,
stop => $found,
+ trip_id => $trip_id,
backend_id => $self->stations->get_backend_id(
efa => $opt{efa}
),
diff --git a/lib/Travelynx/Command/work.pm b/lib/Travelynx/Command/work.pm
index 2b01cb2..5ea1810 100644
--- a/lib/Travelynx/Command/work.pm
+++ b/lib/Travelynx/Command/work.pm
@@ -185,6 +185,85 @@ sub run {
next;
}
+ if ( $entry->{is_efa} ) {
+ eval {
+ $self->app->efa->get_journey_p(
+ trip_id => $train_id,
+ service => $entry->{backend_name}
+ )->then(
+ sub {
+ my ($journey) = @_;
+
+ my $found_dep;
+ my $found_arr;
+ for my $stop ( $journey->route ) {
+ if ( $stop->id_num == $dep ) {
+ $found_dep = $stop;
+ }
+ if ( $arr and $stop->id_num == $arr ) {
+ $found_arr = $stop;
+ last;
+ }
+ }
+ if ( not $found_dep ) {
+ $self->app->log->debug(
+ "Did not find $dep within journey $train_id");
+ return;
+ }
+
+ if ( $found_dep->rt_dep ) {
+ $self->app->in_transit->update_departure_efa(
+ uid => $uid,
+ journey => $journey,
+ stop => $found_dep,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ trip_id => $train_id,
+ );
+ }
+
+ if ( $found_arr and $found_arr->rt_arr ) {
+ $self->app->in_transit->update_arrival_efa(
+ uid => $uid,
+ journey => $journey,
+ stop => $found_arr,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ trip_id => $train_id,
+ );
+ }
+ }
+ )->catch(
+ sub {
+ my ($err) = @_;
+ $backend_issues += 1;
+ $self->app->log->error(
+"work($uid) @ EFA $entry->{backend_name}: journey: $err"
+ );
+ }
+ )->wait;
+
+ if ( $arr
+ and $entry->{real_arr_ts}
+ and $now->epoch - $entry->{real_arr_ts} > 600 )
+ {
+ $self->app->checkout_p(
+ station => $arr,
+ force => 2,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ uid => $uid
+ )->wait;
+ }
+ };
+ if ($@) {
+ $errors += 1;
+ $self->app->log->error(
+ "work($uid) @ EFA $entry->{backend_name}: $@");
+ }
+ next;
+ }
+
if ( $entry->{is_motis} ) {
eval {
diff --git a/lib/Travelynx/Model/InTransit.pm b/lib/Travelynx/Model/InTransit.pm
index 662df54..4f76468 100644
--- a/lib/Travelynx/Model/InTransit.pm
+++ b/lib/Travelynx/Model/InTransit.pm
@@ -176,7 +176,7 @@ sub add {
train_type => $journey->type // q{},
train_line => $journey->line,
train_no => $journey->number // q{},
- train_id => $journey->id,
+ train_id => $opt{trip_id},
sched_departure => $stop->sched_dep,
real_departure => $stop->rt_dep // $stop->sched_dep,
route => $json->encode( \@route ),
@@ -995,6 +995,43 @@ sub update_departure_dbris {
);
}
+sub update_departure_efa {
+ my ( $self, %opt ) = @_;
+ my $uid = $opt{uid};
+ my $db = $opt{db} // $self->{pg}->db;
+ my $dep_eva = $opt{dep_eva};
+ my $arr_eva = $opt{arr_eva};
+ my $journey = $opt{journey};
+ my $stop = $opt{stop};
+ my $json = JSON->new;
+
+ my $res_h = $db->select( 'in_transit', ['data'], { user_id => $uid } )
+ ->expand->hash;
+ my $ephemeral_data = $res_h ? $res_h->{data} : {};
+ if ( $stop->rt_dep ) {
+ $ephemeral_data->{rt} = 1;
+ }
+
+ say "UPDATE dep WHERE $uid $opt{trip_id} $dep_eva $arr_eva";
+
+ # selecting on user_id and train_no avoids a race condition if a user checks
+ # into a new train while we are fetching data for their previous journey. In
+ # this case, the new train would receive data from the previous journey.
+ $db->update(
+ 'in_transit',
+ {
+ data => $json->encode($ephemeral_data),
+ real_departure => $stop->rt_dep,
+ },
+ {
+ user_id => $uid,
+ train_id => $opt{trip_id},
+ checkin_station_id => $dep_eva,
+ checkout_station_id => $arr_eva,
+ }
+ );
+}
+
sub update_departure_motis {
my ( $self, %opt ) = @_;
my $uid = $opt{uid};
@@ -1188,6 +1225,69 @@ sub update_arrival_dbris {
);
}
+sub update_arrival_efa {
+ my ( $self, %opt ) = @_;
+ my $uid = $opt{uid};
+ my $db = $opt{db} // $self->{pg}->db;
+ my $dep_eva = $opt{dep_eva};
+ my $arr_eva = $opt{arr_eva};
+ my $journey = $opt{journey};
+ my $stop = $opt{stop};
+ my $json = JSON->new;
+
+ my $res_h
+ = $db->select( 'in_transit', [ 'data', 'route' ], { user_id => $uid } )
+ ->expand->hash;
+ my $ephemeral_data = $res_h ? $res_h->{data} : {};
+ my $old_route = $res_h ? $res_h->{route} : [];
+
+ if ( $stop->rt_arr ) {
+ $ephemeral_data->{rt} = 1;
+ }
+
+ my @route;
+ for my $j_stop ( $journey->route ) {
+ push(
+ @route,
+ [
+ $j_stop->full_name,
+ $j_stop->id_num,
+ {
+ sched_arr => _epoch( $j_stop->sched_arr ),
+ sched_dep => _epoch( $j_stop->sched_dep ),
+ rt_arr => _epoch( $j_stop->rt_arr ),
+ rt_dep => _epoch( $j_stop->rt_dep ),
+ arr_delay => $j_stop->arr_delay,
+ dep_delay => $j_stop->dep_delay,
+ efa_load => $j_stop->occupancy,
+ lat => $j_stop->latlon->[0],
+ lon => $j_stop->latlon->[1],
+ }
+ ]
+ );
+ }
+
+ say "UPDATE arr WHERE $uid $opt{trip_id} $dep_eva $arr_eva";
+
+ # selecting on user_id and train_no avoids a race condition if a user checks
+ # into a new train while we are fetching data for their previous journey. In
+ # this case, the new train would receive data from the previous journey.
+ $db->update(
+ 'in_transit',
+ {
+ data => $json->encode($ephemeral_data),
+ real_arrival => $stop->rt_arr,
+ route => $json->encode( [@route] ),
+ },
+ {
+ user_id => $uid,
+ train_id => $opt{trip_id},
+ checkin_station_id => $dep_eva,
+ checkout_station_id => $arr_eva,
+ }
+ );
+}
+
sub update_arrival_motis {
my ( $self, %opt ) = @_;
my $uid = $opt{uid};