summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorDaniel Friesel <derf@finalrewind.org>2017-11-11 15:37:20 +0100
committerDaniel Friesel <derf@finalrewind.org>2017-11-11 15:37:20 +0100
commit1e91d706b3c1e63d7f3df52a731664b881ac0a62 (patch)
tree306524e5ce8539226cf4abb83e52ffa9293e47a7 /bin
Implement basic mqtt -> influxdb forwarderHEADmaster
Diffstat (limited to 'bin')
-rwxr-xr-xbin/mqtt-to-influxdb134
1 files changed, 134 insertions, 0 deletions
diff --git a/bin/mqtt-to-influxdb b/bin/mqtt-to-influxdb
new file mode 100755
index 0000000..652e53f
--- /dev/null
+++ b/bin/mqtt-to-influxdb
@@ -0,0 +1,134 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+use 5.020;
+
+our $VERSION = '0.00';
+
+use File::Slurp qw(read_file);
+use Net::MQTT::Simple;
+use LWP::UserAgent;
+use YAML::XS;
+
+sub load_config {
+ my ($config_file) = @_;
+ my $content = read_file($config_file);
+ my $yaml = Load($content);
+ return $yaml;
+}
+
+my ($config_file) = @ARGV;
+
+if ( not defined $config_file ) {
+ die("Usage: $0 <config.yaml>\n");
+}
+
+my $user_config = load_config($config_file);
+my %config = (
+ influxdb => $user_config->{influxdb},
+ mqtt => $user_config->{mqtt},
+);
+
+for my $subscription ( @{ $user_config->{subscriptions} } ) {
+ $config{subscription}{ $subscription->{topic} } = {};
+ for my $key (qw(attributes key norm_factor)) {
+ if ( exists $subscription->{$key} ) {
+ $config{subscription}{ $subscription->{topic} }{$key} = $subscription->{$key};
+ }
+ }
+}
+
+if ( not defined $config{mqtt}{server} ) {
+ die("Error: configuration must specify an mqtt server\n");
+}
+if ( not defined $config{influxdb}{server} ) {
+ die("Error: configuration must specify an influxdb server\n");
+}
+if ( not defined $config{influxdb}{database} ) {
+ die("Error: configuration must specify a database\n");
+}
+
+my $mqtt = Net::MQTT::Simple->new($config{mqtt}{server});
+my $ua = LWP::UserAgent->new(timeout => 10);
+my $api_endpoint = sprintf('http://%s:%s/write?db=%s',
+ $config{influxdb}{server}, $config{influxdb}{port}, $config{influxdb}{database});
+
+my %subscription;
+
+sub message_handler {
+ my ($subscription, $topic, $message) = @_;
+
+ my @topic_parts = split(qr{ / }x, $topic);
+ my $key = $config{subscription}{$subscription}{key} // $topic;
+ my $norm_factor = $config{subscription}{$subscription}{norm_factor} // 1;
+ my $attributes = $config{subscription}{$subscription}{attributes} // {};
+ my @message_and_tags = ($key);
+
+ for my $attribute (keys %{$attributes}) {
+ my $attribute_value = $attributes->{$attribute};
+ my $tag_key = $attribute;
+ my $tag_value = $attribute_value;
+
+ if ($tag_value =~ m{ ^ \$ (?<topic_index> \d+ ) $ }x) {
+ $tag_value = $topic_parts[ $+{topic_index} - 1 ];
+ }
+
+ push(@message_and_tags, "${tag_key}=${tag_value}");
+ }
+
+ my $api_msg = sprintf('%s value=%f',
+ join(',', @message_and_tags),
+ $message * $norm_factor);
+
+ say $api_msg;
+
+ $ua->post($api_endpoint, Content => $api_msg);
+}
+
+for my $name ( keys %{$config{subscription}} ) {
+ $subscription{$name} = sub {
+ my ($topic, $message) = @_;
+ message_handler($name, $topic, $message);
+ };
+}
+
+$mqtt->run(%subscription);
+
+__END__
+
+=head1 NAME
+
+=head1 SYNOPSIS
+
+=head1 VERSION
+
+=head1 DESCRIPTION
+
+=head1 OPTIONS
+
+=over
+
+=back
+
+=head1 EXIT STATUS
+
+=head1 CONFIGURATION
+
+None.
+
+=head1 DEPENDENCIES
+
+=over
+
+=back
+
+=head1 BUGS AND LIMITATIONS
+
+=head1 AUTHOR
+
+Copyright (C) 2017 by Daniel Friesel E<lt>derf@finalrewind.orgE<gt>
+
+=head1 LICENSE
+
+ 0. You just DO WHAT THE FUCK YOU WANT TO.