#!/usr/bin/env perl use strict; use warnings; use 5.020; our $VERSION = '0.00'; use File::Path qw(make_path); use File::Slurp qw(read_file write_file); use Net::MQTT::Simple; use YAML::XS; sub load_config { my ($config_file) = @_; my $content = read_file($config_file); my $yaml = Load($content); return $yaml; } my ($config_file) = @ARGV; if ( not defined $config_file ) { die("Usage: $0 \n"); } my $user_config = load_config($config_file); my $outdir = $user_config->{directory}; my $server = $user_config->{server}; my %config; for my $subscription ( @{ $user_config->{subscriptions} } ) { $config{ $subscription->{topic} } = {}; for my $key (qw(freshness norm_factor)) { if ( exists $subscription->{$key} ) { $config{ $subscription->{topic} }{$key} = $subscription->{$key}; } } } if ( not defined $outdir ) { die("Error: configuration must specify an output directory\n"); } if ( not defined $server ) { die("Error: configuration must specify a server\n"); } my %subscription; for my $name ( keys %config ) { $subscription{$name} = sub { my ( $topic, $message ) = @_; my $basedir = "${outdir}/${topic}"; $basedir =~ s{ / [^/]+ $ }{}x; if ( exists $config{$name}{norm_factor} ) { $message *= $config{$name}{norm_factor}; } if ( not -e $basedir ) { make_path($basedir); } write_file( "${outdir}/${topic}", $message ); }; if ( exists $config{$name}{freshness} and $name =~ m{ [*] }x ) { say STDERR "Warning: $name: freshness checks on subscriptions containing " . "an asterisk ('*') are not supported."; say STDERR " I will not check this topic's freshness"; delete $config{$name}{freshness}; } if ( exists $config{$name}{freshness} and $name =~ m{ [#] }x ) { say STDERR "Warning: $name: freshness checks on subscriptions containing " . "multi-level wildcards ('#') are not yet supported."; say STDERR " I will not check this topic's freshness"; delete $config{$name}{freshness}; } } sub check_freshness { my ( $topic, $now ) = @_; my $glob_expr = $topic; $glob_expr =~ s{ [+] }{*}gx; for my $file ( glob("${outdir}/${glob_expr}") ) { my $mtime = ( stat($file) )[9]; if ( -f $file and $now - $mtime > $config{$topic}{freshness} ) { unlink($file) or say STDERR "Unable to delete ${file} during freshness check: $!"; } } } my $mqtt = Net::MQTT::Simple->new($server); $mqtt->subscribe(%subscription); while (1) { $mqtt->tick(30); my $now = time(); for my $topic ( keys %config ) { if ( exists $config{$topic}{freshness} ) { check_freshness( $topic, $now ); } } } __END__ =head1 NAME mqttsyncdir-subscriber - map MQTT messages to filesystem entries =head1 SYNOPSIS B I =head1 VERSION 0.00 =head1 DESCRIPTION =head1 OPTIONS None yet. =head1 EXIT STATUS =head1 CONFIGURATION =head2 EXAMPLE CONFIGURATION =begin text The following configuration will subscribe to three topics, normalize message bodies of one of them and also remove stale data in two cases. directory: /srv/mqtt server: mqtt.example subscriptions: - topic: sensor/+/temperature freshness: 600 - topic: sensor/+/voltage freshness: 600 norm_factor: 0.000001 - topic: counter/whatever =end text This configuration file will subscribe to the topics C<< sensor/+/temperature >>, C<< sensor/+/voltage >>, and C<< counter/whatever >>. Each time a message is posted to one of them, the appropriate file in F will be updated. So, assuming the message C<< 12.5 >> is posted to C<< sensor/outdoor/temperature >>, the string C<< 12.5 >> will be written to F. The message C<< 10700000 >> to C<< sensor/main_battery/voltage >> will result in C<< 10.7 >> being written to F. Any file belonging to the topics C<< sensor/+/temperature >> or C<< sensor/+/voltage >> which did not receive an update in the last 10 minutes will be deleted. =head1 DEPENDENCIES =over =item * Net::MQTT::Simple =back =head1 BUGS AND LIMITATIONS =head1 AUTHOR Copyright (C) 2017-2018 by Daniel Friesel Ederf@finalrewind.orgE =head1 LICENSE 0. You just DO WHAT THE FUCK YOU WANT TO.