- Using MongoDB as a storage of callback list and buffer
- Using HTTP as push message
- First, a subscriber requests to /sub with key which is wanted.
- When a publisher fired a event about a key, it requests to /pub with the key with a message
- This message queue server receives a pub request and turns over the message to subscribers by HTTP.
This is only for internal usage of web application. There are no authentications.
package Buspub;
use strict;
use warnings;
sub route ($$);
use Router::Simple;
use Tie::IxHash;
use AnyEvent::HTTP;
use HTTP::Request::Common;
use JSON::XS;
use POSIX ();
use MongoDB;
my $connection = MongoDB::Connection->new(host => 'localhost', port => 27017);
my $subscriptions = $connection->get_database('pubsub')->subscriptions;
$subscriptions->ensure_index(Tie::IxHash->new( key => 1, created => -1 ), { background => 1 });
route '/sub' => {
action => sub {
my ($r) = @_;
my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
my $callback = $r->req->param('callback') or return $r->json({ error => "callback required"});;
my $id = $subscriptions->insert({
key => $key,
messages => [],
callback => $callback,
max => 25,
created => scalar time(),
});
LOG("SUB:: %s with callback: %s", $key, $callback);
$r->json({ id => "$id" });
}
};
route '/pub' => {
action => sub {
my ($r) = @_;
my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
my $message = decode_json $r->req->param('message');
my $cursor = $subscriptions->query({ key => $key });
my %count;
while (my $obj = $cursor->next) {
$count{all}++;
my $id = $obj->{_id} . "";
my $uri = $obj->{callback};
my $messages = [ @{$obj->{messages}}, $message ];
my $req = POST $uri, [ id => $id, messages => encode_json($messages) ];
LOG("PUB:: => %s => %s with %d messages", $key, $req->uri, scalar @$messages);
http_request $req->method => $req->uri,
body => $req->content,
headers => {
map { $_ => $req->header($_), } $req->headers->header_field_names
},
timeout => 20,
sub {
my ($body, $headers) = @_;
LOG("PUB:: <= %s <= %s with status:%d", $key, $req->uri, $headers->{Status});
if ($headers->{Status} =~ /^2/) {
$subscriptions->update({ _id => $obj->{_id} }, { '$pullAll' => { messages => $obj->{messages} } });
} elsif ($headers->{Status} =~ /^4/) {
$subscriptions->remove({ _id => $obj->{_id} });
} elsif ($headers->{Status} =~ /^5/) {
if (@$messages > $obj->{max}) {
$subscriptions->remove({ _id => $obj->{_id} });
} else {
$subscriptions->update({ _id => $obj->{_id} }, { '$push' => { messages => $message } });
}
}
}
;
}
$r->json({ key => $key, delivered => \%count });
}
};
route '/test/callback' => {
action => sub {
my ($r) = @_;
use Data::Dumper;
warn Dumper $r->req->param('id') ;
warn Dumper decode_json $r->req->param('messages') ;
$r->res->status($r->req->param('code') || 404);
$r->res->content_type('application/json; charset=utf8');
$r->res->content('{}');
$r->res->finalize;
}
};
BEGIN {
my $router = Router::Simple->new;
sub route ($$) { $router->connect(@_) };
sub run {
my ($env) = @_;
if ( my $handler = $router->match($env) ) {
my $c = Buspub::Context->new($env);
$handler->{action}->($c);
} else {
[ 404, [ 'Content-Type' => 'text/html' ], ['Not Found'] ];
}
}
sub LOG {
my ($message, @args) = @_;
print sprintf("[%s] $message", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), @args), "\n";
}
};
package Buspub::Request;
use parent qw(Plack::Request);
package Buspub::Response;
use parent qw(Plack::Response);
package Buspub::Context;
use JSON::XS;
sub new {
my ($class, $env) = @_;
bless {
req => Buspub::Request->new($env),
res => Buspub::Response->new(200),
}, $class;
}
sub req { $_[0]->{req} }
sub res { $_[0]->{res} }
sub json {
my ($self, $vars) = @_;
my $body = JSON::XS->new->ascii(1)->encode($vars);
$self->res->content_type('application/json; charset=utf8');
$self->res->content($body);
$self->res->finalize;
}
\&Buspub::run;
[[perl]] [[http]] [[webhook]] [[messagequeue]]