From 1e91d706b3c1e63d7f3df52a731664b881ac0a62 Mon Sep 17 00:00:00 2001 From: Daniel Friesel Date: Sat, 11 Nov 2017 15:37:20 +0100 Subject: Implement basic mqtt -> influxdb forwarder --- bin/mqtt-to-influxdb | 134 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100755 bin/mqtt-to-influxdb diff --git a/bin/mqtt-to-influxdb b/bin/mqtt-to-influxdb new file mode 100755 index 0000000..652e53f --- /dev/null +++ b/bin/mqtt-to-influxdb @@ -0,0 +1,134 @@ +#!/usr/bin/env perl + +use strict; +use warnings; +use 5.020; + +our $VERSION = '0.00'; + +use File::Slurp qw(read_file); +use Net::MQTT::Simple; +use LWP::UserAgent; +use YAML::XS; + +sub load_config { + my ($config_file) = @_; + my $content = read_file($config_file); + my $yaml = Load($content); + return $yaml; +} + +my ($config_file) = @ARGV; + +if ( not defined $config_file ) { + die("Usage: $0 \n"); +} + +my $user_config = load_config($config_file); +my %config = ( + influxdb => $user_config->{influxdb}, + mqtt => $user_config->{mqtt}, +); + +for my $subscription ( @{ $user_config->{subscriptions} } ) { + $config{subscription}{ $subscription->{topic} } = {}; + for my $key (qw(attributes key norm_factor)) { + if ( exists $subscription->{$key} ) { + $config{subscription}{ $subscription->{topic} }{$key} = $subscription->{$key}; + } + } +} + +if ( not defined $config{mqtt}{server} ) { + die("Error: configuration must specify an mqtt server\n"); +} +if ( not defined $config{influxdb}{server} ) { + die("Error: configuration must specify an influxdb server\n"); +} +if ( not defined $config{influxdb}{database} ) { + die("Error: configuration must specify a database\n"); +} + +my $mqtt = Net::MQTT::Simple->new($config{mqtt}{server}); +my $ua = LWP::UserAgent->new(timeout => 10); +my $api_endpoint = sprintf('http://%s:%s/write?db=%s', + $config{influxdb}{server}, $config{influxdb}{port}, $config{influxdb}{database}); + +my %subscription; + +sub message_handler { + my ($subscription, $topic, $message) = @_; + + my @topic_parts = split(qr{ / }x, $topic); + my $key = $config{subscription}{$subscription}{key} // $topic; + my $norm_factor = $config{subscription}{$subscription}{norm_factor} // 1; + my $attributes = $config{subscription}{$subscription}{attributes} // {}; + my @message_and_tags = ($key); + + for my $attribute (keys %{$attributes}) { + my $attribute_value = $attributes->{$attribute}; + my $tag_key = $attribute; + my $tag_value = $attribute_value; + + if ($tag_value =~ m{ ^ \$ (? \d+ ) $ }x) { + $tag_value = $topic_parts[ $+{topic_index} - 1 ]; + } + + push(@message_and_tags, "${tag_key}=${tag_value}"); + } + + my $api_msg = sprintf('%s value=%f', + join(',', @message_and_tags), + $message * $norm_factor); + + say $api_msg; + + $ua->post($api_endpoint, Content => $api_msg); +} + +for my $name ( keys %{$config{subscription}} ) { + $subscription{$name} = sub { + my ($topic, $message) = @_; + message_handler($name, $topic, $message); + }; +} + +$mqtt->run(%subscription); + +__END__ + +=head1 NAME + +=head1 SYNOPSIS + +=head1 VERSION + +=head1 DESCRIPTION + +=head1 OPTIONS + +=over + +=back + +=head1 EXIT STATUS + +=head1 CONFIGURATION + +None. + +=head1 DEPENDENCIES + +=over + +=back + +=head1 BUGS AND LIMITATIONS + +=head1 AUTHOR + +Copyright (C) 2017 by Daniel Friesel Ederf@finalrewind.orgE + +=head1 LICENSE + + 0. You just DO WHAT THE FUCK YOU WANT TO. -- cgit v1.2.3