summaryrefslogtreecommitdiff
path: root/lib/Travelynx/Command/work.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Travelynx/Command/work.pm')
-rw-r--r--lib/Travelynx/Command/work.pm332
1 files changed, 163 insertions, 169 deletions
diff --git a/lib/Travelynx/Command/work.pm b/lib/Travelynx/Command/work.pm
index 24621b5..10b1b69 100644
--- a/lib/Travelynx/Command/work.pm
+++ b/lib/Travelynx/Command/work.pm
@@ -1,16 +1,16 @@
package Travelynx::Command::work;
-# Copyright (C) 2020 Daniel Friesel
+# Copyright (C) 2020-2023 Birte Kristina Friesel
#
# SPDX-License-Identifier: AGPL-3.0-or-later
use Mojo::Base 'Mojolicious::Command';
+use Mojo::Promise;
use DateTime;
use JSON;
use List::Util;
-has description =>
- 'Perform automatic checkout when users arrive at their destination';
+has description => 'Update real-time data of active journeys';
has usage => sub { shift->extract_usage };
@@ -21,24 +21,94 @@ sub run {
my $checkin_deadline = $now->clone->subtract( hours => 48 );
my $json = JSON->new;
- my $db = $self->app->pg->db;
+ my $num_incomplete = $self->app->in_transit->delete_incomplete_checkins(
+ earlier_than => $checkin_deadline );
- my $res = $db->delete( 'in_transit',
- { checkin_time => { '<', $checkin_deadline } } );
-
- if ( my $rows = $res->rows ) {
- $self->app->log->debug("Removed ${rows} incomplete checkins");
+ if ($num_incomplete) {
+ $self->app->log->debug("Removed ${num_incomplete} incomplete checkins");
}
- for my $entry (
- $db->select( 'in_transit_str', '*', { cancelled => 0 } )->hashes->each )
- {
+ my $errors = 0;
+
+ for my $entry ( $self->app->in_transit->get_all_active ) {
my $uid = $entry->{user_id};
my $dep = $entry->{dep_eva};
my $arr = $entry->{arr_eva};
my $train_id = $entry->{train_id};
+ if ( $train_id =~ m{[|]} ) {
+
+ $self->app->hafas->get_journey_p( trip_id => $train_id )->then(
+ sub {
+ my ($journey) = @_;
+
+ my $found_dep;
+ my $found_arr;
+ for my $stop ( $journey->route ) {
+ if ( $stop->loc->eva == $dep ) {
+ $found_dep = $stop;
+ }
+ if ( $arr and $stop->loc->eva == $arr ) {
+ $found_arr = $stop;
+ last;
+ }
+ }
+ if ( not $found_dep ) {
+ $self->app->log->debug(
+ "Did not find $dep within journey $train_id");
+ return;
+ }
+
+ if ( $found_dep->{rt_dep} ) {
+ $self->app->in_transit->update_departure_hafas(
+ uid => $uid,
+ journey => $journey,
+ stop => $found_dep,
+ dep_eva => $dep,
+ arr_eva => $arr
+ );
+ }
+
+ if ( $found_arr and $found_arr->{rt_arr} ) {
+ $self->app->in_transit->update_arrival_hafas(
+ uid => $uid,
+ journey => $journey,
+ stop => $found_arr,
+ dep_eva => $dep,
+ arr_eva => $arr
+ );
+ }
+ }
+ )->catch(
+ sub {
+ my ($err) = @_;
+ if ( $err =~ m{svcResL\[0\][.]err is (?:FAIL|PARAMETER)$} )
+ {
+ # HAFAS do be weird. These are not actionable.
+ $self->app->log->debug("work($uid)/journey: $err");
+ }
+ else {
+ $self->app->log->error("work($uid)/journey: $err");
+ }
+ }
+ )->wait;
+
+ if ( $arr
+ and $entry->{real_arr_ts}
+ and $now->epoch - $entry->{real_arr_ts} > 600 )
+ {
+ $self->app->checkout_p(
+ station => $arr,
+ force => 2,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ uid => $uid
+ )->wait;
+ }
+ next;
+ }
+
# Note: IRIS data is not always updated in real-time. Both departure and
# arrival delays may take several minutes to appear, especially in case
# of large-scale disturbances. We work around this by continuing to
@@ -60,72 +130,43 @@ sub run {
@{ $status->{results} };
if ( not $train ) {
- die("could not find train $train_id at $dep\n");
+ $self->app->log->debug(
+ "could not find train $train_id at $dep\n");
+ return;
}
- # selecting on user_id and train_no avoids a race condition when
- # a user checks into a new train while we are fetching data for
- # their previous journey. In this case, the new train would
- # receive data from the previous journey.
- $db->update(
- 'in_transit',
- {
- dep_platform => $train->platform,
- real_departure => $train->departure,
- route => $json->encode(
- [ $self->app->iris->route_diff($train) ]
- ),
- messages => $json->encode(
- [
- map { [ $_->[0]->epoch, $_->[1] ] }
- $train->messages
- ]
- ),
- },
- {
- user_id => $uid,
- train_no => $train->train_no
- }
+ $self->app->in_transit->update_departure(
+ uid => $uid,
+ train => $train,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ route => [ $self->app->iris->route_diff($train) ]
);
+
if ( $train->departure_is_cancelled and $arr ) {
+ my $checked_in
+ = $self->app->in_transit->update_departure_cancelled(
+ uid => $uid,
+ train => $train,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ );
# depending on the amount of users in transit, some time may
# have passed between fetching $entry from the database and
- # now. Ensure that the user is still checked into this train
- # before calling checkout to mark the cancellation.
- if (
- $db->select(
- 'in_transit',
- 'count(*) as count',
- {
- user_id => $uid,
- train_no => $train->train_no,
- checkin_station_id => $dep,
- checkout_station_id => $arr,
- }
- )->hash->{count}
- )
- {
- $db->update(
- 'in_transit',
- {
- cancelled => 1,
- },
- {
- user_id => $uid,
- train_no => $train->train_no,
- checkin_station_id => $dep,
- checkout_station_id => $arr,
- }
- );
+ # now. Only check out if the user is still checked into this
+ # train.
+ if ($checked_in) {
- # check out (adds a cancelled journey and resets journey state
- # to checkin
- $self->app->checkout(
+ # check out (adds a cancelled journey and resets journey state
+ # to checkin
+ $self->app->checkout_p(
station => $arr,
- force => 1,
+ force => 2,
+ dep_eva => $dep,
+ arr_eva => $arr,
uid => $uid
- );
+ )->wait;
}
}
else {
@@ -134,6 +175,7 @@ sub run {
}
};
if ($@) {
+ $errors += 1;
$self->app->log->error("work($uid)/departure: $@");
}
@@ -173,128 +215,80 @@ sub run {
return;
}
- # selecting on user_id, train_no and checkout_station_id avoids a
- # race condition when a user checks into a new train or changes
- # their destination station while we are fetching times based on no
- # longer valid database entries.
- $db->update(
- 'in_transit',
- {
- arr_platform => $train->platform,
- sched_arrival => $train->sched_arrival,
- real_arrival => $train->arrival,
- route => $json->encode(
- [ $self->app->iris->route_diff($train) ]
- ),
- messages => $json->encode(
- [
- map { [ $_->[0]->epoch, $_->[1] ] }
- $train->messages
- ]
- ),
- },
- {
- user_id => $uid,
- train_no => $train->train_no,
- checkout_station_id => $arr
- }
+ my $checked_in = $self->app->in_transit->update_arrival(
+ uid => $uid,
+ train => $train,
+ route => [ $self->app->iris->route_diff($train) ],
+ dep_eva => $dep,
+ arr_eva => $arr,
);
- if ( $train->arrival_is_cancelled ) {
- # depending on the amount of users in transit, some time may
- # have passed between fetching $entry from the database and
- # now. Ensure that the user is still checked into this train
- # before calling checkout to mark the cancellation.
- if (
- $db->select(
- 'in_transit',
- 'count(*) as count',
- {
- user_id => $uid,
- train_no => $train->train_no,
- checkout_station_id => $arr
- }
- )->hash->{count}
- )
- {
- # check out (adds a cancelled journey and resets journey state
- # to destination selection)
- $self->app->checkout(
- station => $arr,
- force => 0,
- uid => $uid
- );
- }
+ if ( $checked_in and $train->arrival_is_cancelled ) {
+
+ # check out (adds a cancelled journey and resets journey state
+ # to destination selection)
+ $self->app->checkout_p(
+ station => $arr,
+ force => 0,
+ dep_eva => $dep,
+ arr_eva => $arr,
+ uid => $uid
+ )->wait;
}
else {
- $self->app->add_route_timestamps( $uid, $train, 0 );
+ $self->app->add_route_timestamps(
+ $uid, $train, 0,
+ (
+ defined $entry->{real_arr_ts}
+ and $now->epoch > $entry->{real_arr_ts}
+ ) ? 1 : 0
+ );
}
}
elsif ( $entry->{real_arr_ts} ) {
- my ( undef, $error ) = $self->app->checkout(
+ my ( undef, $error ) = $self->app->checkout_p(
station => $arr,
- force => 1,
+ force => 2,
+ dep_eva => $dep,
+ arr_eva => $arr,
uid => $uid
- );
- if ($error) {
- die("${error}\n");
- }
+ )->catch(
+ sub {
+ my ($error) = @_;
+ $self->app->log->error("work($uid)/arrival: $error");
+ $errors += 1;
+ }
+ )->wait;
}
};
if ($@) {
$self->app->log->error("work($uid)/arrival: $@");
+ $errors += 1;
}
- eval { }
+ eval { };
}
- for my $account_data ( $self->app->traewelling->get_pull_accounts ) {
-
- # $account_data->{user_id} is the travelynx uid
- # $account_data->{user_name} is the Träwelling username
- $self->app->log->debug(
- "Pulling Traewelling status for UID $account_data->{user_id}");
- $self->app->traewelling_api->get_status_p(
- username => $account_data->{data}{user_name},
- token => $account_data->{token}
- )->then(
- sub {
- my ($traewelling) = @_;
- $self->app->traewelling_to_travelynx(
- traewelling => $traewelling,
- user_data => $account_data
- );
- }
- )->catch(
- sub {
- my ($err) = @_;
- $self->app->log->debug("Error $err");
- }
- )->wait;
- }
+ my $started_at = $now;
+ my $main_finished_at = DateTime->now( time_zone => 'Europe/Berlin' );
+ my $worker_duration = $main_finished_at->epoch - $started_at->epoch;
- for my $candidate ( $self->app->traewelling->get_pushable_accounts ) {
- $self->app->log->debug(
- "Pushing to Traewelling for UID $candidate->{uid}");
- my $trip_id = $candidate->{journey_data}{trip_id};
- if ( not $trip_id ) {
- $self->app->log->debug("... trip_id is missing");
- $self->app->traewelling->log(
- uid => $candidate->{uid},
- message =>
-"Fehler bei $candidate->{train_type} $candidate->{train_no}: Keine trip_id vorhanden",
- is_error => 1
+ if ( $self->app->config->{influxdb}->{url} ) {
+ if ( $self->app->mode eq 'development' ) {
+ $self->app->log->debug( 'POST '
+ . $self->app->config->{influxdb}->{url}
+ . " worker runtime_seconds=${worker_duration},errors=${errors}"
);
- next;
}
- if ( $candidate->{data}{latest_push_ts}
- and $candidate->{data}{latest_push_ts} == $candidate->{checkin_ts} )
- {
- $self->app->log->debug("... already handled");
- next;
+ else {
+ $self->app->ua->post_p( $self->app->config->{influxdb}->{url},
+ "worker runtime_seconds=${worker_duration},errors=${errors}" )
+ ->wait;
}
- $self->app->traewelling_api->checkin( %{$candidate},
- trip_id => $trip_id );
+ }
+
+ if ( not $self->app->config->{traewelling}->{separate_worker} ) {
+ $self->app->start('traewelling');
}
}