diff options
Diffstat (limited to 'lib/Travelynx/Command')
-rw-r--r-- | lib/Travelynx/Command/work.pm | 341 |
1 files changed, 177 insertions, 164 deletions
diff --git a/lib/Travelynx/Command/work.pm b/lib/Travelynx/Command/work.pm index 1bc4f53..071befa 100644 --- a/lib/Travelynx/Command/work.pm +++ b/lib/Travelynx/Command/work.pm @@ -18,7 +18,7 @@ has description => 'Update real-time data of active journeys'; has usage => sub { shift->extract_usage }; sub run { - my ($self) = @_; + my ( $self, $backend ) = @_; my $now = DateTime->now( time_zone => 'Europe/Berlin' ); my $checkin_deadline = $now->clone->subtract( hours => 48 ); @@ -53,7 +53,9 @@ sub run { my $arr = $entry->{arr_eva}; my $train_id = $entry->{train_id}; - if ( $train_id eq 'manual' ) { + if ( $train_id eq 'manual' + and ( not $backend or $backend eq 'manual' ) ) + { if ( $arr and $entry->{real_arr_ts} and $now->epoch - $entry->{real_arr_ts} > 600 ) @@ -66,20 +68,21 @@ sub run { uid => $uid )->wait; } - - next; } - if ( $entry->{is_dbris} ) { + elsif ( $entry->{is_dbris} and ( not $backend or $backend eq 'dbris' ) ) + { eval { - Mojo::Promise->timer( $dbris_rate_limited ? 4.5 : 1.0 )->then( + Mojo::Promise->timer( + $dbris_rate_limited ? 4.5 : ( $backend ? 1.2 : 1.0 ) ) + ->then( sub { return $self->app->dbris->get_journey_p( trip_id => $train_id ); } - )->then( + )->then( sub { my ($journey) = @_; @@ -172,7 +175,7 @@ sub run { )->wait; } } - )->catch( + )->catch( sub { my ($err) = @_; $self->app->log->debug( @@ -186,7 +189,7 @@ sub run { $backend_issues += 1; } } - )->wait; + )->wait; if ( $arr and $entry->{real_arr_ts} @@ -206,10 +209,9 @@ sub run { $self->app->log->error( "work($uid) @ DBRIS $entry->{backend_name}: $@"); } - next; } - if ( $entry->{is_efa} ) { + elsif ( $entry->{is_efa} and ( not $backend or $backend eq 'efa' ) ) { eval { $self->app->efa->get_journey_p( trip_id => $train_id, @@ -302,10 +304,10 @@ sub run { $self->app->log->error( "work($uid) @ EFA $entry->{backend_name}: $@"); } - next; } - if ( $entry->{is_motis} ) { + elsif ( $entry->{is_motis} and ( not $backend or $backend eq 'motis' ) ) + { eval { $self->app->motis->get_trip_p( @@ -326,6 +328,10 @@ sub run { stop => $stopover->stop, motis => $entry->{backend_name}, ); + + $self->app->log->debug( "mapped " + . $stopover->stop->id . " to " + . $stopover->stop->{eva} ); } } @@ -399,10 +405,10 @@ sub run { $self->app->log->error( "work($uid) @ MOTIS $entry->{backend_name}: $@"); } - next; } - if ( $entry->{is_hafas} ) { + elsif ( $entry->{is_hafas} and ( not $backend or $backend eq 'hafas' ) ) + { eval { @@ -533,7 +539,6 @@ sub run { $self->app->log->error( "work($uid) @ HAFAS $entry->{backend_name}: $@"); } - next; } # TODO irgendwo ist hier ne race condition wo ein neuer checkin (in HAFAS) mit IRIS-Daten überschrieben wird. @@ -545,182 +550,186 @@ sub run { # update departure data for up to 15 minutes after departure and # delaying automatic checkout by at least 10 minutes. - eval { - if ( $now->epoch - $entry->{real_dep_ts} < 900 ) { - my $status = $self->app->iris->get_departures( - station => $dep, - lookbehind => 30, - lookahead => 30 - ); - if ( $status->{errstr} ) { - die("get_departures($dep): $status->{errstr}\n"); - } - - my ($train) = List::Util::first { $_->train_id eq $train_id } - @{ $status->{results} }; + elsif ( $entry->{is_iris} and ( not $backend or $backend eq 'iris' ) ) { + eval { + if ( $now->epoch - $entry->{real_dep_ts} < 900 ) { + my $status = $self->app->iris->get_departures( + station => $dep, + lookbehind => 30, + lookahead => 30 + ); + if ( $status->{errstr} ) { + die("get_departures($dep): $status->{errstr}\n"); + } - if ( not $train ) { - $self->app->log->debug( - "could not find train $train_id at $dep\n"); - return; - } + my ($train) + = List::Util::first { $_->train_id eq $train_id } + @{ $status->{results} }; - $self->app->in_transit->update_departure( - uid => $uid, - train => $train, - dep_eva => $dep, - arr_eva => $arr, - route => [ $self->app->iris->route_diff($train) ] - ); + if ( not $train ) { + $self->app->log->debug( + "could not find train $train_id at $dep\n"); + return; + } - if ( $train->departure_is_cancelled and $arr ) { - my $checked_in - = $self->app->in_transit->update_departure_cancelled( + $self->app->in_transit->update_departure( uid => $uid, train => $train, dep_eva => $dep, arr_eva => $arr, - ); - - # depending on the amount of users in transit, some time may - # have passed between fetching $entry from the database and - # now. Only check out if the user is still checked into this - # train. - if ($checked_in) { + route => [ $self->app->iris->route_diff($train) ] + ); - # check out (adds a cancelled journey and resets journey state - # to checkin - $self->app->checkout_p( - station => $arr, - force => 2, + if ( $train->departure_is_cancelled and $arr ) { + my $checked_in + = $self->app->in_transit->update_departure_cancelled( + uid => $uid, + train => $train, dep_eva => $dep, arr_eva => $arr, - uid => $uid - )->wait; + ); + + # depending on the amount of users in transit, some time may + # have passed between fetching $entry from the database and + # now. Only check out if the user is still checked into this + # train. + if ($checked_in) { + + # check out (adds a cancelled journey and resets journey state + # to checkin + $self->app->checkout_p( + station => $arr, + force => 2, + dep_eva => $dep, + arr_eva => $arr, + uid => $uid + )->wait; + } + } + else { + $self->app->add_route_timestamps( $uid, $train, 1 ); + $self->app->add_wagonorder( + uid => $uid, + train_id => $train->train_id, + is_departure => 1, + eva => $dep, + datetime => $train->sched_departure, + train_type => $train->type, + train_no => $train->train_no + ); + $self->app->add_stationinfo( $uid, 1, $train->train_id, + $dep, $arr ); } } - else { - $self->app->add_route_timestamps( $uid, $train, 1 ); - $self->app->add_wagonorder( - uid => $uid, - train_id => $train->train_id, - is_departure => 1, - eva => $dep, - datetime => $train->sched_departure, - train_type => $train->type, - train_no => $train->train_no - ); - $self->app->add_stationinfo( $uid, 1, $train->train_id, - $dep, $arr ); - } + }; + if ($@) { + $errors += 1; + $self->app->log->error("work($uid) @ IRIS: departure: $@"); } - }; - if ($@) { - $errors += 1; - $self->app->log->error("work($uid) @ IRIS: departure: $@"); - } - eval { - if ( - $arr - and ( not $entry->{real_arr_ts} - or $now->epoch - $entry->{real_arr_ts} < 600 ) - ) - { - my $status = $self->app->iris->get_departures( - station => $arr, - lookbehind => 20, - lookahead => 220 - ); - if ( $status->{errstr} ) { - die("get_departures($arr): $status->{errstr}\n"); - } + eval { + if ( + $arr + and ( not $entry->{real_arr_ts} + or $now->epoch - $entry->{real_arr_ts} < 600 ) + ) + { + my $status = $self->app->iris->get_departures( + station => $arr, + lookbehind => 20, + lookahead => 220 + ); + if ( $status->{errstr} ) { + die("get_departures($arr): $status->{errstr}\n"); + } - # Note that a train may pass the same station several times. - # Notable example: S41 / S42 ("Ringbahn") both starts and - # terminates at Berlin Südkreuz - my ($train) = List::Util::first { - $_->train_id eq $train_id - and $_->sched_arrival - and $_->sched_arrival->epoch > $entry->{sched_dep_ts} - } - @{ $status->{results} }; + # Note that a train may pass the same station several times. + # Notable example: S41 / S42 ("Ringbahn") both starts and + # terminates at Berlin Südkreuz + my ($train) = List::Util::first { + $_->train_id eq $train_id + and $_->sched_arrival + and $_->sched_arrival->epoch > $entry->{sched_dep_ts} + } + @{ $status->{results} }; - $train //= List::Util::first { $_->train_id eq $train_id } - @{ $status->{results} }; + $train //= List::Util::first { $_->train_id eq $train_id } + @{ $status->{results} }; - if ( not $train ) { + if ( not $train ) { - # If we haven't seen the train yet, its arrival is probably - # too far in the future. This is not critical. - return; - } + # If we haven't seen the train yet, its arrival is probably + # too far in the future. This is not critical. + return; + } - my $checked_in = $self->app->in_transit->update_arrival( - uid => $uid, - train => $train, - route => [ $self->app->iris->route_diff($train) ], - dep_eva => $dep, - arr_eva => $arr, - ); + my $checked_in = $self->app->in_transit->update_arrival( + uid => $uid, + train => $train, + route => [ $self->app->iris->route_diff($train) ], + dep_eva => $dep, + arr_eva => $arr, + ); - if ( $checked_in and $train->arrival_is_cancelled ) { + if ( $checked_in and $train->arrival_is_cancelled ) { - # check out (adds a cancelled journey and resets journey state - # to destination selection) - $self->app->checkout_p( + # check out (adds a cancelled journey and resets journey state + # to destination selection) + $self->app->checkout_p( + station => $arr, + force => 0, + dep_eva => $dep, + arr_eva => $arr, + uid => $uid + )->wait; + } + else { + $self->app->add_route_timestamps( + $uid, $train, 0, + ( + defined $entry->{real_arr_ts} + and $now->epoch > $entry->{real_arr_ts} + ) ? 1 : 0 + ); + $self->app->add_wagonorder( + uid => $uid, + train_id => $train->train_id, + is_arrival => 1, + eva => $arr, + datetime => $train->sched_departure, + train_type => $train->type, + train_no => $train->train_no + ); + $self->app->add_stationinfo( $uid, 0, $train->train_id, + $dep, $arr ); + } + } + elsif ( $entry->{real_arr_ts} ) { + my ( undef, $error ) = $self->app->checkout_p( station => $arr, - force => 0, + force => 2, dep_eva => $dep, arr_eva => $arr, uid => $uid + )->catch( + sub { + my ($error) = @_; + $backend_issues += 1; + $self->app->log->error( + "work($uid) @ IRIS: arrival: $error"); + $errors += 1; + } )->wait; } - else { - $self->app->add_route_timestamps( - $uid, $train, 0, - ( - defined $entry->{real_arr_ts} - and $now->epoch > $entry->{real_arr_ts} - ) ? 1 : 0 - ); - $self->app->add_wagonorder( - uid => $uid, - train_id => $train->train_id, - is_arrival => 1, - eva => $arr, - datetime => $train->sched_departure, - train_type => $train->type, - train_no => $train->train_no - ); - $self->app->add_stationinfo( $uid, 0, $train->train_id, - $dep, $arr ); - } - } - elsif ( $entry->{real_arr_ts} ) { - my ( undef, $error ) = $self->app->checkout_p( - station => $arr, - force => 2, - dep_eva => $dep, - arr_eva => $arr, - uid => $uid - )->catch( - sub { - my ($error) = @_; - $backend_issues += 1; - $self->app->log->error( - "work($uid) @ IRIS: arrival: $error"); - $errors += 1; - } - )->wait; + }; + if ($@) { + $self->app->log->error("work($uid) @ IRIS: arrival: $@"); + $errors += 1; } - }; - if ($@) { - $self->app->log->error("work($uid) @ IRIS: arrival: $@"); - $errors += 1; + + eval { }; } - eval { }; } my $started_at = $now; @@ -728,15 +737,19 @@ sub run { my $worker_duration = $main_finished_at->epoch - $started_at->epoch; if ( $self->app->config->{influxdb}->{url} ) { + my $tags = q{}; + if ($backend) { + $tags .= ",backend=${backend}"; + } if ( $self->app->mode eq 'development' ) { $self->app->log->debug( 'POST ' . $self->app->config->{influxdb}->{url} - . " worker runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" + . " worker${tags} runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" ); } else { $self->app->ua->post_p( $self->app->config->{influxdb}->{url}, -"worker runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" +"worker${tags} runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" )->wait; } } |