Home > software > brmd: A Case for POE

brmd: A Case for POE

In brmlab, we want to track who is unlocking the space, whether someone is inside, have some good visual indicator that live stream is on air, and so on. In other words, we have an Arduino with some further hardware, and we want to show whatever is reported by the Arduino on IRC and web, and provide some web-based control (open/closed status override) in the opposite direction too.

What to use for a service (we call it brmd) that will bind all these interfaces together? It just needs a lot of boring frontends and simple state maintenance. It turns out that Perl’s POE framework is ideal for this – most of the code for IRC, HTTP and device read/write is already there, so you just grab the modules, slam them together and you have exactly what you need with minimal effort. Right?

It turns out that there are caveats – basically, the idea is correct, aside of getting stuck on a single stupidity of mine, I’d have the whole thing put together in something like half an hour. Unfortunately, the moment you want robustness too, things are getting a lot more complex; to handle the device disappearing, IRC disconnections, not having HTTP socket fds leak away, etc., you suddenly need to either magically know what further modules to hook up or start exeting some manual effort. Still, I like how POE is making it so easy to give a simple state machine many input/output interfaces and when you get used to the idiosyncracies, you can even make it somewhat reliable.

Example POE code

While this task seems to be ideal fit for POE, I’ve found surprisingly few examples of more complex POE component interaction on the web. Therefore, I’m going to lay out at least tersed up version of brmd below to help fellow future googlers. Nothing of this is anything ground-breaking, but it should help a lot to have a template to go by. Our real version is somewhat more verbose and includes some more interfaces: brmdoor.git:brmd/brmd.pl

I assume that you already know what “POE kernel” and “POE session” is. Beware that I’m a POE beginner myself, and I haven’t gone through much effort to clean the code up the way I would if I were to work together with someone else on this. Some things surely aren’t solved optimally and you might even pick up a bad habit or two.

In order to have some neat separation, we will divide brmd to several components where each will take care of a single interface; the main package will only spawn them up and do some most basic coordination. If we were to grow much larger, it would be worth the effort to even set up some kind of message bus (I wish POE would provide that implicitly), here we just directly signal components needing the info.

Main package

#!/usr/bin/perl
use strict;
use warnings;
use POE;
 
our $channel = "#brmlab";
our $streamurl = "http://nat.brmlab.cz:8090/brmstream.asf";
our $devdoor = $ARGV[0]; $devdoor ||= "/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A700e1qB-if00-port0";
 
our ($status, $streaming = (0, 0); # state information
sub status_str { $status ? 'OPEN' : 'CLOSED'; }
sub streaming_str { $streaming ? 'ON AIR' : 'OFF AIR'; }

The prologue. After setting up our constraints, we set up some default variables (that should probably be set in the appropriate components instead) and set up our initial state; $status may be 1 if open, 0 if closed, $streaming analogously.

my $irc = brmd::IRC->new();
my $web = brmd::WWW->new();
my $door = brmd::Door->new();
my $stream = brmd::Stream->new();

We create our components. brmd::IRC will take care of notifying the IRC channel and updates of its topic, brmd::WWW will serve some terse web pages on port 8088, brmd::Door will talk with the serial device where Arduino is hooked up and brmd::Stream will take care of turning on/off the streaming (by ssh’ing to another machine with a camera and ffmpeg server).

POE::Session->create(
        package_states => [
                main => [ qw(_default _start
                                status_update streaming_update) ],
        ],
        heap => { irc => $irc, web => $web, door => $door, stream => $stream },
);
 
$poe_kernel->run();

Now, we create the main session. POE session is an entity that has some state baggage and can receive and process arbitrary named events. _default and _start will be invoked by POE itself, status_update and streaming_update will be triggered by the components created above. Then, we start the whole machinery by handing away control to the POE kernel.

sub _default {
        my ($event, $args) = @_[ARG0 .. $#_];
        my @output = ( (scalar localtime), "main $event: " );
 
        for my $arg (@$args) {
                if ( ref $arg eq 'ARRAY' ) {
                        push( @output, '[' . join(', ', @$arg ) . ']' );
                }
                else {
                        push( @output, "'$arg'" );
                }
        }
        print join ' ', @output, "\n";
}

Boring code stolen from some example will make sure that we see unhandled events arriving. Helps to catch typos and also general debugging. We reuse this code in our other components too, but I will skip it in this post for brevity.

sub _start {
        $poe_kernel->post($_[HEAP]->{web}, 'register');
        $poe_kernel->post($_[HEAP]->{door}, 'register');
}

When our session is created, we will send an event to web and door so that they can send us back some interesting events.

sub status_update {
        my ($self, $newstatus, $manual, $nick) = @_[OBJECT, ARG0 .. ARG2];
        $status = $newstatus;
 
        $manual and $poe_kernel->post($door, 'status_override', $status);
        $poe_kernel->post( $irc, 'notify_update', 'brmstatus', status_str(), undef, $manual, $nick );
}
 
sub streaming_update {
        my ($self, $newstreaming) = @_[OBJECT, ARG0];
        $streaming = $newstreaming;
 
        $poe_kernel->post( $stream, $streaming ? 'stream_start' : 'stream_stop' );
        $poe_kernel->post( $irc, 'notify_update', 'brmvideo', streaming_str(), $streaming ? $streamurl : undef );
}

Now finally some event handlers! These are called by web or door when someone flips the right switch or clicks a button. Aside of updating the state variable, they will also notify the appropriate components.

Stream component

Component time now! Let’s start with the simplest:

package brmd::Stream;
 
use POE;
 
sub new {
        my $class = shift;
        my $self = bless { }, $class;
 
        POE::Session->create(
                object_states => [
                        $self => [ qw(_start _default
                                        stream_start stream_stop) ],
                ],
        );
        return $self;
}
 
sub _start {
        $_[KERNEL]->alias_set("$_[OBJECT]");
}
 
sub stream_switch { system('ssh brmstream@brmvid "echo '.($_[0]?'START':'STOP').' >/tmp/brmstream"'); }
sub stream_start { stream_switch(1); }
sub stream_stop { stream_switch(0); }
 
1;

This is the component taking care of turning on/off the live stream. We can just throw in the package definition in the same Perl file; the new method setting up the object will also set up a separate session for our component, and implement the stream_start, stream_stop events which we both implement using a stream_switch() helper. So we just post the appropriate event to the component when we want to switch the stream on/off.

The alias_set() in _start is so that we can do $poe_kernel->post($objectreference, ‘eventname’, …).

Door component

Let’s take a look at a more complex component – the serial interface:

package brmd::Door;
 
use POE qw(Wheel::ReadWrite Filter::Line);
use Symbol qw(gensym);
use Device::SerialPort;
 
sub new {
        my $class = shift;
        my $self = bless { }, $class;
 
        POE::Session->create(
                object_states => [
                        $self => [ qw(_start _default register
                                        serial_input serial_error
                                        status_override) ],
                ],
        );
        return $self;
}
 
sub serial_open {
        my ($device) = @_;
        # Open a serial port, and tie it to a file handle for POE.
        my $handle = gensym();
        my $port = tie(*$handle, "Device::SerialPort", $device);
        die "can't open port: $!" unless $port;
        $port->datatype('raw');
        $port->baudrate(9600);
        $port->databits(8);
        $port->parity("none");
        $port->stopbits(1);
        $port->handshake("none");
        $port->write_settings();
        return $handle;
}
 
sub _start {
        $_[KERNEL]->alias_set("$_[OBJECT]");
 
        $_[HEAP]->{serial} = POE::Wheel::ReadWrite->new(
                Handle => serial_open($devdoor),
                Filter => POE::Filter::Line->new(
                        InputLiteral  => "\x0A",    # Received line endings.
                        OutputLiteral => "\x0A",    # Sent line endings.
                        ),
                InputEvent => "serial_input",
                ErrorEvent => "serial_error",
        ) or die "Door fail: $!";
}
 
sub register {
        my ($self, $sender) = @_[OBJECT, SENDER];
        my $sid = $sender->ID;
        $poe_kernel->refcount_increment($sid, 'observer_door'); # XXX: No decrement
        push (@{$self->{'observers'}}, $sid);
}

Wow! Well, here we have a much more complex component. It can get events from various sources and includes a wheel. That’s the way POE does I/O asynchronously. After _start sets up the wheel, we will be getting serial_input and serial_error events from the wheel, and status_change event from the main session. We need to attach the wheel to a file handle, serial_open() takes care of that for a serial device.

Our component will be of course dispatching events in case something interesting comes over the serial; it will send them to anyone who calls register first, by simply iterating a list of event targets. We need to bump the refcount, otherwise a session that has no I/O and no refcounts will get garbage-collected the first thing after starting POE kernel (that would happen to our main session!).

sub serial_input {
        my ($self, $input) = @_[OBJECT, ARG0];
        print ((scalar localtime)." $input\n");
        $input =~ /^(\d) (\d) (.*)$/ or return;
        my ($cur_status, $cur_streaming, $brm) = ($1, $2, $3);
        if ($cur_status != $status) {
                foreach (@{$self->{observers}}) {
                        $poe_kernel->post($_, 'status_update', $cur_status);
                }
        }
        if ($cur_streaming != $streaming) {
                foreach (@{$self->{observers}}) {
                        $poe_kernel->post($_, 'streaming_update', $cur_streaming);
                }
        }
        if ($brm =~ s/^CARD //) {
                print "from door: $input\n";
                if ($brm =~ /^UNKNOWN/) {
                        $poe_kernel->post( $irc, 'notify_door_unauth' );
                } else {
                        $poe_kernel->post( $irc, 'notify_door_unlocked', $brm );
                }
        }
}
 
sub serial_error {
        my ($heap) = ($_[HEAP]);
        print "$_[ARG0] error $_[ARG1]: $_[ARG2]\n";
}

The serial protocol is trivial; every second, the Arduino sends a line that shows state of the status and stream flips, and possibly a ‘CARD’ string with nickname of whoever unlocked the door using their RFID card. Our handler will check if state of either flip changed, broadcasting events appropriately, and possibly posting a notification about (attempted) door unlock on IRC.

sub status_override {
        my ($heap, $status) = @_[HEAP, ARG0];
        my $serial = $heap->{serial};
        $serial->put('s'.$status);
        $serial->flush();
}

On the other hand, this even may come from the main session and it means that the status flip state shall be overriden (someone forgot to flip it when leaving). The Arduino is notified as it needs to turn off the LED near the switch.

IRC component

The POE::Component::IRC has been one of the show-cases of POE. Here, we will wrap this component in another session that will provide interface with the rest of brmd.

package brmd::IRC;
 
use POE qw(Component::IRC Component::IRC::Plugin::Connector);
 
sub new {
        my $class = shift;
        my $self = bless { }, $class;
 
        my $irc = POE::Component::IRC->spawn(
                nick => 'brmbot',
                ircname => 'The Brmitron',
                server => 'irc.freenode.org',
        ) or die "IRC fail: $!";
        my $connector = POE::Component::IRC::Plugin::Connector->new();
 
        POE::Session->create(
                object_states => [
                        $self => [ qw(_start _default
                                        irc_001 irc_public irc_332 irc_topic
                                        notify_update
                                        notify_door_unauth notify_door_unlocked) ],
                ],
                heap => { irc => $irc, connector => $connector },
        );
        return $self;
}
 
sub _start {
        $_[KERNEL]->alias_set("$_[OBJECT]");
        my $irc = $_[HEAP]->{irc};
        $irc->yield( register => 'all' );
        $irc->plugin_add( 'Connector' => $_[HEAP]->{connector} );
        $irc->yield( connect => { } );
}
 
sub irc_001 {
        my $sender = $_[SENDER];
        my $irc = $sender->get_heap();
        print "Connected to ", $irc->server_name(), "\n";
        $irc->yield( join => $channel );
}

Even more complex initialization. We first initialize the POE::Component::IRC object, then our own session that will handle incoming/outgoing events, then in _start we tie together our session and the POE::Component::IRC object. The Connector plugin will take care of reconnects to the IRC server in case our connection is interrupted.

The irc_-prefixed events are updates coming from POE::Component::IRC, while notify_-prefixed events are commands from other brmd components to update the channel.
irc_001 means we have connected successfully and we can join our channel.

our $topic; BEGIN { $topic = 'BRMLAB OPEN'; }
 
sub topic_update {
        my ($irc) = @_;
        my $newtopic = $topic;
        if ($status) { $newtopic =~ s/BRMLAB CLOSED/BRMLAB OPEN/g; } else { $newtopic =~ s/BRMLAB OPEN/BRMLAB CLOSED/g; }
        if ($streaming) { $newtopic =~ s#OFF AIR#ON AIR ($streamurl)#g; } else { $newtopic =~ s#ON AIR.*? \|#OFF AIR |#g; }
        if ($newtopic ne $topic) {
                $topic = $newtopic;
                $irc->yield (topic => $channel => $topic );
        }
}
 
sub irc_332 {
        my ($sender, $server, $str, $data) = @_[SENDER, ARG0 .. ARG2];
        $topic = $data->[1]; print "new topic: $topic\n";
        topic_update($_[HEAP]->{irc});
}
 
sub irc_topic {
        my ($sender, $who, $where, $what) = @_[SENDER, ARG0 .. ARG2];
        $topic = $what; print "new topic: $topic\n"; 
        topic_update($_[HEAP]->{irc});
}

The best source for current state information is just peeking at the topic. These commands will keep the topic up to date.

N.B. that we do not initialize $topic within the declaration! I spent quite a bit of time on this one. Since we are defining packages within a single brmd.pl file, we need to realize that Perl executes commands sequentially within a single file, and since the control never leaves the main package, the assignment will never be executed! BEGIN{} will make sure the assignment is performed before execution of main starts.

sub notify_update {
        my ($sender, $comp, $status, $extra, $manual, $nick) = @_[SENDER, ARG0 .. ARG4];
        my $irc = $_[HEAP]->{irc};
        my $msg = "[$comp] update: \002$status\002";
        $extra and $msg .= " $extra";
        $manual and $msg .= " ($manual manual override by $nick)";
        $irc->yield (privmsg => $channel => $msg );
        topic_update($irc);
}
 
sub notify_door_unauth {
        my ($sender) = $_[SENDER];
        my $irc = $_[HEAP]->{irc};
        my $msg = "[door] unauthorized access denied!";
        $irc->yield (privmsg => $channel => $msg );
}
 
sub notify_door_unlocked {
        my ($sender, $nick) = @_[SENDER, ARG0];
        my $irc = $_[HEAP]->{irc};
        my $msg = "[door] unlocked by: \002$nick";
        $irc->yield (privmsg => $channel => $msg );
}

And these events take care of dispatching notifications. In case of notify_update, the global state changed and we will also update the topic if appropriate. At any rate, we will send a message to the channel with description of what is going on, formatted appropriately for the IRC medium.

WWW component

The last piece of the puzzle is web interface that will change based on current state and allow overriding it.

package brmd::WWW;
 
use POE qw(Component::Server::HTTP);
use HTTP::Status qw/RC_OK/;
use CGI;
 
sub new {
        my $class = shift;
        my $self = bless { }, $class;
 
        my $web = POE::Component::Server::HTTP->new(
                Port           => 8088,
                ContentHandler => {
                        "/brmstatus.txt" => sub { $self->web_brmstatus_txt(@_) },
                        "/brmstatus-switch" => sub { $self->web_brmstatus_switch(@_) },
                        "/" => \&web_index
                },
                Headers => {Server => 'brmd/xxx'},
        ) or die "WWW fail: $!";
 
        POE::Session->create(
                object_states => [
                        $self => [ qw(_start _default register) ],
                ],
                heap => { web => $web },
        );
 
        return $self;
}
 
sub _start {
        $_[KERNEL]->alias_set("$_[OBJECT]");
}

Unfortunately, the HTTP::Server component is rather unPOE-ish. It would be much nicer if accessing various URLs would trigger proper events in our session instead of directly calling subroutines. Oh well…

sub disable_caching {
        my ($response) = @_;
        $response->push_header("Cache-Control", "no-cache, must-revalidate");
        $response->push_header("Expires", "Sat, 26 Jul 1997 05:00:00 GMT");
}
 
sub web_index {
        my ($request, $response) = @_;
 
        my $sts = main::status_str();
        my $str = main::streaming_str();
        my $r_link = $streaming ? "| $streamurl - watch now!" : '';
 
        $response->protocol("HTTP/1.1");
        $response->code(RC_OK);
        $response->push_header("Content-Type", "text/plain");
        disable_caching($response);
 
        $response->content(<<EOT
brmd web interface
brmstatus ($sts) | brmstatus.txt status page
brmvideo ($str) $r_link
EOT
        );
 
        return RC_OK;
}
 
sub web_brmstatus_txt {
        my ($self, $request, $response) = @_;
 
        my $st = main::status_str();
 
        $response->protocol("HTTP/1.1");
        $response->code(RC_OK);
        $response->push_header("Content-Type", "text/plain");
        disable_caching($response);
        $response->content($st);
 
        return RC_OK;
}
 
sub web_brmstatus_switch {
        my ($self, $request, $response) = @_;
 
        my $q = new CGI($request->content);
        my $nick = $q->param('nick');
 
        my $newstatus = not $status;
        foreach (@{$self->{observers}}) {
                $poe_kernel->post($_, 'status_update', $newstatus, 'web', $nick);
        }
 
        $response->protocol("HTTP/1.1");
        $response->code(302);
        $response->header('Location' => 'brmstatus.txt');
 
        return RC_OK;
}

I think there isn’t much to talk about. Of course in reality, these are HTML files intead of text files, I just made it as brief as possible. :-)

So, enjoy POEing!

Categories: software Tags: , , ,
  1. No comments yet.
  1. No trackbacks yet.


five × = 15