diff options
| author | Birte Kristina Friesel <derf@finalrewind.org> | 2025-06-27 20:23:32 +0200 | 
|---|---|---|
| committer | Birte Kristina Friesel <derf@finalrewind.org> | 2025-06-27 20:23:32 +0200 | 
| commit | f136a53b56961d6a1704670198bf20a66917db9b (patch) | |
| tree | aaa133d02e6810d315e7e94ace4535050600a877 /lib/Travelynx | |
| parent | 7cd9c86dcc80b24e86aaaa9624210330e310342f (diff) | |
work: support distinct worker invocations per backend
Diffstat (limited to 'lib/Travelynx')
| -rw-r--r-- | lib/Travelynx/Command/work.pm | 331 | 
1 files changed, 171 insertions, 160 deletions
diff --git a/lib/Travelynx/Command/work.pm b/lib/Travelynx/Command/work.pm index 1bc4f53..fa0a2ad 100644 --- a/lib/Travelynx/Command/work.pm +++ b/lib/Travelynx/Command/work.pm @@ -18,7 +18,7 @@ has description => 'Update real-time data of active journeys';  has usage => sub { shift->extract_usage };  sub run { -	my ($self) = @_; +	my ( $self, $backend ) = @_;  	my $now              = DateTime->now( time_zone => 'Europe/Berlin' );  	my $checkin_deadline = $now->clone->subtract( hours => 48 ); @@ -53,7 +53,9 @@ sub run {  		my $arr      = $entry->{arr_eva};  		my $train_id = $entry->{train_id}; -		if ( $train_id eq 'manual' ) { +		if ( $train_id eq 'manual' +			and ( not $backend or $backend eq 'manual' ) ) +		{  			if (    $arr  				and $entry->{real_arr_ts}  				and $now->epoch - $entry->{real_arr_ts} > 600 ) @@ -66,11 +68,10 @@ sub run {  					uid     => $uid  				)->wait;  			} - -			next;  		} -		if ( $entry->{is_dbris} ) { +		elsif ( $entry->{is_dbris} and ( not $backend or $backend eq 'dbris' ) ) +		{  			eval { @@ -206,10 +207,9 @@ sub run {  				$self->app->log->error(  					"work($uid) @ DBRIS $entry->{backend_name}: $@");  			} -			next;  		} -		if ( $entry->{is_efa} ) { +		elsif ( $entry->{is_efa} and ( not $backend or $backend eq 'efa' ) ) {  			eval {  				$self->app->efa->get_journey_p(  					trip_id => $train_id, @@ -302,10 +302,10 @@ sub run {  				$self->app->log->error(  					"work($uid) @ EFA $entry->{backend_name}: $@");  			} -			next;  		} -		if ( $entry->{is_motis} ) { +		elsif ( $entry->{is_motis} and ( not $backend or $backend eq 'motis' ) ) +		{  			eval {  				$self->app->motis->get_trip_p( @@ -326,6 +326,10 @@ sub run {  									stop  => $stopover->stop,  									motis => $entry->{backend_name},  								); + +								$self->app->log->debug( "mapped " +									  . $stopover->stop->id . " to " +									  . $stopover->stop->{eva} );  							}  						} @@ -399,10 +403,10 @@ sub run {  				$self->app->log->error(  					"work($uid) @ MOTIS $entry->{backend_name}: $@");  			} -			next;  		} -		if ( $entry->{is_hafas} ) { +		elsif ( $entry->{is_hafas} and ( not $backend or $backend eq 'hafas' ) ) +		{  			eval { @@ -533,7 +537,6 @@ sub run {  				$self->app->log->error(  					"work($uid) @ HAFAS $entry->{backend_name}: $@");  			} -			next;  		}  		# TODO irgendwo ist hier ne race condition wo ein neuer checkin (in HAFAS) mit IRIS-Daten überschrieben wird. @@ -545,182 +548,186 @@ sub run {  		# update departure data for up to 15 minutes after departure and  		# delaying automatic checkout by at least 10 minutes. -		eval { -			if ( $now->epoch - $entry->{real_dep_ts} < 900 ) { -				my $status = $self->app->iris->get_departures( -					station    => $dep, -					lookbehind => 30, -					lookahead  => 30 -				); -				if ( $status->{errstr} ) { -					die("get_departures($dep): $status->{errstr}\n"); -				} - -				my ($train) = List::Util::first { $_->train_id eq $train_id } -				@{ $status->{results} }; +		elsif ( $entry->{is_iris} and ( not $backend or $backend eq 'iris' ) ) { +			eval { +				if ( $now->epoch - $entry->{real_dep_ts} < 900 ) { +					my $status = $self->app->iris->get_departures( +						station    => $dep, +						lookbehind => 30, +						lookahead  => 30 +					); +					if ( $status->{errstr} ) { +						die("get_departures($dep): $status->{errstr}\n"); +					} -				if ( not $train ) { -					$self->app->log->debug( -						"could not find train $train_id at $dep\n"); -					return; -				} +					my ($train) +					  = List::Util::first { $_->train_id eq $train_id } +					@{ $status->{results} }; -				$self->app->in_transit->update_departure( -					uid     => $uid, -					train   => $train, -					dep_eva => $dep, -					arr_eva => $arr, -					route   => [ $self->app->iris->route_diff($train) ] -				); +					if ( not $train ) { +						$self->app->log->debug( +							"could not find train $train_id at $dep\n"); +						return; +					} -				if ( $train->departure_is_cancelled and $arr ) { -					my $checked_in -					  = $self->app->in_transit->update_departure_cancelled( +					$self->app->in_transit->update_departure(  						uid     => $uid,  						train   => $train,  						dep_eva => $dep,  						arr_eva => $arr, -					  ); - -					# depending on the amount of users in transit, some time may -					# have passed between fetching $entry from the database and -					# now. Only check out if the user is still checked into this -					# train. -					if ($checked_in) { +						route   => [ $self->app->iris->route_diff($train) ] +					); -						# check out (adds a cancelled journey and resets journey state -						# to checkin -						$self->app->checkout_p( -							station => $arr, -							force   => 2, +					if ( $train->departure_is_cancelled and $arr ) { +						my $checked_in +						  = $self->app->in_transit->update_departure_cancelled( +							uid     => $uid, +							train   => $train,  							dep_eva => $dep,  							arr_eva => $arr, -							uid     => $uid -						)->wait; +						  ); + +						# depending on the amount of users in transit, some time may +						# have passed between fetching $entry from the database and +						# now. Only check out if the user is still checked into this +						# train. +						if ($checked_in) { + +							# check out (adds a cancelled journey and resets journey state +							# to checkin +							$self->app->checkout_p( +								station => $arr, +								force   => 2, +								dep_eva => $dep, +								arr_eva => $arr, +								uid     => $uid +							)->wait; +						} +					} +					else { +						$self->app->add_route_timestamps( $uid, $train, 1 ); +						$self->app->add_wagonorder( +							uid          => $uid, +							train_id     => $train->train_id, +							is_departure => 1, +							eva          => $dep, +							datetime     => $train->sched_departure, +							train_type   => $train->type, +							train_no     => $train->train_no +						); +						$self->app->add_stationinfo( $uid, 1, $train->train_id, +							$dep, $arr );  					}  				} -				else { -					$self->app->add_route_timestamps( $uid, $train, 1 ); -					$self->app->add_wagonorder( -						uid          => $uid, -						train_id     => $train->train_id, -						is_departure => 1, -						eva          => $dep, -						datetime     => $train->sched_departure, -						train_type   => $train->type, -						train_no     => $train->train_no -					); -					$self->app->add_stationinfo( $uid, 1, $train->train_id, -						$dep, $arr ); -				} +			}; +			if ($@) { +				$errors += 1; +				$self->app->log->error("work($uid) @ IRIS: departure: $@");  			} -		}; -		if ($@) { -			$errors += 1; -			$self->app->log->error("work($uid) @ IRIS: departure: $@"); -		} -		eval { -			if ( -				$arr -				and ( not $entry->{real_arr_ts} -					or $now->epoch - $entry->{real_arr_ts} < 600 ) -			  ) -			{ -				my $status = $self->app->iris->get_departures( -					station    => $arr, -					lookbehind => 20, -					lookahead  => 220 -				); -				if ( $status->{errstr} ) { -					die("get_departures($arr): $status->{errstr}\n"); -				} +			eval { +				if ( +					$arr +					and ( not $entry->{real_arr_ts} +						or $now->epoch - $entry->{real_arr_ts} < 600 ) +				  ) +				{ +					my $status = $self->app->iris->get_departures( +						station    => $arr, +						lookbehind => 20, +						lookahead  => 220 +					); +					if ( $status->{errstr} ) { +						die("get_departures($arr): $status->{errstr}\n"); +					} -				# Note that a train may pass the same station several times. -				# Notable example: S41 / S42 ("Ringbahn") both starts and -				# terminates at Berlin Südkreuz -				my ($train) = List::Util::first { -					$_->train_id eq $train_id -					  and $_->sched_arrival -					  and $_->sched_arrival->epoch > $entry->{sched_dep_ts} -				} -				@{ $status->{results} }; +					# Note that a train may pass the same station several times. +					# Notable example: S41 / S42 ("Ringbahn") both starts and +					# terminates at Berlin Südkreuz +					my ($train) = List::Util::first { +						$_->train_id eq $train_id +						  and $_->sched_arrival +						  and $_->sched_arrival->epoch > $entry->{sched_dep_ts} +					} +					@{ $status->{results} }; -				$train //= List::Util::first { $_->train_id eq $train_id } -				@{ $status->{results} }; +					$train //= List::Util::first { $_->train_id eq $train_id } +					@{ $status->{results} }; -				if ( not $train ) { +					if ( not $train ) { -					# If we haven't seen the train yet, its arrival is probably -					# too far in the future. This is not critical. -					return; -				} +						# If we haven't seen the train yet, its arrival is probably +						# too far in the future. This is not critical. +						return; +					} -				my $checked_in = $self->app->in_transit->update_arrival( -					uid     => $uid, -					train   => $train, -					route   => [ $self->app->iris->route_diff($train) ], -					dep_eva => $dep, -					arr_eva => $arr, -				); +					my $checked_in = $self->app->in_transit->update_arrival( +						uid     => $uid, +						train   => $train, +						route   => [ $self->app->iris->route_diff($train) ], +						dep_eva => $dep, +						arr_eva => $arr, +					); -				if ( $checked_in and $train->arrival_is_cancelled ) { +					if ( $checked_in and $train->arrival_is_cancelled ) { -					# check out (adds a cancelled journey and resets journey state -					# to destination selection) -					$self->app->checkout_p( +						# check out (adds a cancelled journey and resets journey state +						# to destination selection) +						$self->app->checkout_p( +							station => $arr, +							force   => 0, +							dep_eva => $dep, +							arr_eva => $arr, +							uid     => $uid +						)->wait; +					} +					else { +						$self->app->add_route_timestamps( +							$uid, $train, 0, +							( +								defined $entry->{real_arr_ts} +								  and $now->epoch > $entry->{real_arr_ts} +							) ? 1 : 0 +						); +						$self->app->add_wagonorder( +							uid        => $uid, +							train_id   => $train->train_id, +							is_arrival => 1, +							eva        => $arr, +							datetime   => $train->sched_departure, +							train_type => $train->type, +							train_no   => $train->train_no +						); +						$self->app->add_stationinfo( $uid, 0, $train->train_id, +							$dep, $arr ); +					} +				} +				elsif ( $entry->{real_arr_ts} ) { +					my ( undef, $error ) = $self->app->checkout_p(  						station => $arr, -						force   => 0, +						force   => 2,  						dep_eva => $dep,  						arr_eva => $arr,  						uid     => $uid +					)->catch( +						sub { +							my ($error) = @_; +							$backend_issues += 1; +							$self->app->log->error( +								"work($uid) @ IRIS: arrival: $error"); +							$errors += 1; +						}  					)->wait;  				} -				else { -					$self->app->add_route_timestamps( -						$uid, $train, 0, -						( -							defined $entry->{real_arr_ts} -							  and $now->epoch > $entry->{real_arr_ts} -						) ? 1 : 0 -					); -					$self->app->add_wagonorder( -						uid        => $uid, -						train_id   => $train->train_id, -						is_arrival => 1, -						eva        => $arr, -						datetime   => $train->sched_departure, -						train_type => $train->type, -						train_no   => $train->train_no -					); -					$self->app->add_stationinfo( $uid, 0, $train->train_id, -						$dep, $arr ); -				} -			} -			elsif ( $entry->{real_arr_ts} ) { -				my ( undef, $error ) = $self->app->checkout_p( -					station => $arr, -					force   => 2, -					dep_eva => $dep, -					arr_eva => $arr, -					uid     => $uid -				)->catch( -					sub { -						my ($error) = @_; -						$backend_issues += 1; -						$self->app->log->error( -							"work($uid) @ IRIS: arrival: $error"); -						$errors += 1; -					} -				)->wait; +			}; +			if ($@) { +				$self->app->log->error("work($uid) @ IRIS: arrival: $@"); +				$errors += 1;  			} -		}; -		if ($@) { -			$self->app->log->error("work($uid) @ IRIS: arrival: $@"); -			$errors += 1; + +			eval { };  		} -		eval { };  	}  	my $started_at       = $now; @@ -728,15 +735,19 @@ sub run {  	my $worker_duration  = $main_finished_at->epoch - $started_at->epoch;  	if ( $self->app->config->{influxdb}->{url} ) { +		my $tags = q{}; +		if ($backend) { +			$tags .= ",backend=${backend}"; +		}  		if ( $self->app->mode eq 'development' ) {  			$self->app->log->debug( 'POST '  				  . $self->app->config->{influxdb}->{url} -				  . " worker runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" +				  . " worker${tags} runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}"  			);  		}  		else {  			$self->app->ua->post_p( $self->app->config->{influxdb}->{url}, -"worker runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}" +"worker${tags} runtime_seconds=${worker_duration},errors=${errors},backend_errors=${backend_issues},ratelimit_count=${rate_limit_counts}"  			)->wait;  		}  	}  | 
