- Using MongoDB as a storage of callback list and buffer
- Using HTTP as push message
- First, a subscriber requests to /sub with key which is wanted.
- When a publisher fired a event about a key, it requests to /pub with the key with a
message - This message queue server receives a pub request and turns over the message to subscribers by HTTP.
This is only for internal usage of web application. There are no authentications.
package Buspub; use strict; use warnings; sub route ($$); use Router::Simple; use Tie::IxHash; use AnyEvent::HTTP; use HTTP::Request::Common; use JSON::XS; use POSIX (); use MongoDB; my $connection = MongoDB::Connection->new(host => 'localhost', port => 27017); my $subscriptions = $connection->get_database('pubsub')->subscriptions; $subscriptions->ensure_index(Tie::IxHash->new( key => 1, created => -1 ), { background => 1 }); route '/sub' => { action => sub { my ($r) = @_; my $key = $r->req->param('key') or return $r->json({ error => "key required"});; my $callback = $r->req->param('callback') or return $r->json({ error => "callback required"});; my $id = $subscriptions->insert({ key => $key, messages => [], callback => $callback, max => 25, created => scalar time(), }); LOG("SUB:: %s with callback: %s", $key, $callback); $r->json({ id => "$id" }); } }; route '/pub' => { action => sub { my ($r) = @_; my $key = $r->req->param('key') or return $r->json({ error => "key required"});; my $message = decode_json $r->req->param('message'); my $cursor = $subscriptions->query({ key => $key }); my %count; while (my $obj = $cursor->next) { $count{all}++; my $id = $obj->{_id} . ""; my $uri = $obj->{callback}; my $messages = [ @{$obj->{messages}}, $message ]; my $req = POST $uri, [ id => $id, messages => encode_json($messages) ]; LOG("PUB:: => %s => %s with %d messages", $key, $req->uri, scalar @$messages); http_request $req->method => $req->uri, body => $req->content, headers => { map { $_ => $req->header($_), } $req->headers->header_field_names }, timeout => 20, sub { my ($body, $headers) = @_; LOG("PUB:: <= %s <= %s with status:%d", $key, $req->uri, $headers->{Status}); if ($headers->{Status} =~ /^2/) { $subscriptions->update({ _id => $obj->{_id} }, { '$pullAll' => { messages => $obj->{messages} } }); } elsif ($headers->{Status} =~ /^4/) { $subscriptions->remove({ _id => $obj->{_id} }); } elsif ($headers->{Status} =~ /^5/) { if (@$messages > $obj->{max}) { $subscriptions->remove({ _id => $obj->{_id} }); } else { $subscriptions->update({ _id => $obj->{_id} }, { '$push' => { messages => $message } }); } } } ; } $r->json({ key => $key, delivered => \%count }); } }; route '/test/callback' => { action => sub { my ($r) = @_; use Data::Dumper; warn Dumper $r->req->param('id') ; warn Dumper decode_json $r->req->param('messages') ; $r->res->status($r->req->param('code') || 404); $r->res->content_type('application/json; charset=utf8'); $r->res->content('{}'); $r->res->finalize; } }; BEGIN { my $router = Router::Simple->new; sub route ($$) { $router->connect(@_) }; sub run { my ($env) = @_; if ( my $handler = $router->match($env) ) { my $c = Buspub::Context->new($env); $handler->{action}->($c); } else { [ 404, [ 'Content-Type' => 'text/html' ], ['Not Found'] ]; } } sub LOG { my ($message, @args) = @_; print sprintf("[%s] $message", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), @args), "\n"; } }; package Buspub::Request; use parent qw(Plack::Request); package Buspub::Response; use parent qw(Plack::Response); package Buspub::Context; use JSON::XS; sub new { my ($class, $env) = @_; bless { req => Buspub::Request->new($env), res => Buspub::Response->new(200), }, $class; } sub req { $_[0]->{req} } sub res { $_[0]->{res} } sub json { my ($self, $vars) = @_; my $body = JSON::XS->new->ascii(1)->encode($vars); $self->res->content_type('application/json; charset=utf8'); $self->res->content($body); $self->res->finalize; } \&Buspub::run;
[[perl]] [[http]] [[webhook]] [[messagequeue]]