Niro* by cho45

#17 pub/sub message queue with HTTP, simple implementation on MongoDB

  • Using MongoDB as a storage of callback list and buffer
  • Using HTTP as push message
  1. First, a subscriber requests to /sub with key which is wanted.
  2. When a publisher fired a event about a key, it requests to /pub with the key with a message
  3. This message queue server receives a pub request and turns over the message to subscribers by HTTP.

This is only for internal usage of web application. There are no authentications.

package Buspub;

use strict;
use warnings;
sub route ($$);
use Router::Simple;
use Tie::IxHash;
use AnyEvent::HTTP;
use HTTP::Request::Common;
use JSON::XS;
use POSIX ();

use MongoDB;
my $connection = MongoDB::Connection->new(host => 'localhost', port => 27017);
my $subscriptions = $connection->get_database('pubsub')->subscriptions;
$subscriptions->ensure_index(Tie::IxHash->new( key => 1, created => -1 ), { background => 1 });

route '/sub' => {
	action => sub {
		my ($r) = @_;
		my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
		my $callback = $r->req->param('callback') or return $r->json({ error => "callback required"});;
		my $id = $subscriptions->insert({
			key      => $key,
			messages => [],
			callback => $callback,
			max      => 25,
			created  => scalar time(),
		});
		LOG("SUB:: %s with callback: %s", $key, $callback);
		$r->json({ id => "$id" });
	}
};

route '/pub' => {
	action => sub {
		my ($r) = @_;
		my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
		my $message = decode_json $r->req->param('message');

		my $cursor = $subscriptions->query({ key => $key });
		my %count;
		while (my $obj = $cursor->next) {
			$count{all}++;
			my $id  = $obj->{_id} . "";
			my $uri = $obj->{callback};
			my $messages = [ @{$obj->{messages}}, $message ];
			my $req = POST $uri, [ id => $id, messages => encode_json($messages) ];

			LOG("PUB:: => %s => %s with %d messages", $key, $req->uri, scalar @$messages);
			http_request $req->method => $req->uri,
				body => $req->content,
				headers => {
					map { $_ => $req->header($_), } $req->headers->header_field_names
				},
				timeout => 20,
				sub {
					my ($body, $headers) = @_;
					LOG("PUB:: <= %s <= %s with status:%d", $key, $req->uri, $headers->{Status});
					if ($headers->{Status} =~ /^2/) {
						$subscriptions->update({ _id => $obj->{_id} }, { '$pullAll' => { messages => $obj->{messages} } });
					} elsif ($headers->{Status} =~ /^4/) {
						$subscriptions->remove({ _id => $obj->{_id} });
					} elsif ($headers->{Status} =~ /^5/) {
						if (@$messages > $obj->{max}) {
							$subscriptions->remove({ _id => $obj->{_id} });
						} else {
							$subscriptions->update({ _id => $obj->{_id} }, { '$push' => { messages => $message } });
						}
					}
				}
			;
		}
		$r->json({ key => $key, delivered => \%count });
	}
};

route '/test/callback' =>  {
	action => sub {
		my ($r) = @_;
		use Data::Dumper;
		warn Dumper $r->req->param('id') ;
		warn Dumper decode_json $r->req->param('messages') ;
		$r->res->status($r->req->param('code') || 404);
		$r->res->content_type('application/json; charset=utf8');
		$r->res->content('{}');
		$r->res->finalize;
	}
};


BEGIN {
	my $router = Router::Simple->new;
	sub route ($$) { $router->connect(@_) };

	sub run {
		my ($env) = @_;
		if ( my $handler = $router->match($env) ) {
			my $c = Buspub::Context->new($env);
			$handler->{action}->($c);
		} else {
			[ 404, [ 'Content-Type' => 'text/html' ], ['Not Found'] ];
		}
	}

	sub LOG {
		my ($message, @args) = @_;
		print sprintf("[%s] $message", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), @args), "\n";
	}
};

package Buspub::Request;
use parent qw(Plack::Request);

package Buspub::Response;
use parent qw(Plack::Response);

package Buspub::Context;
use JSON::XS;

sub new {
	my ($class, $env) = @_;
	bless {
		req => Buspub::Request->new($env),
		res => Buspub::Response->new(200),
	}, $class;
}

sub req { $_[0]->{req} }
sub res { $_[0]->{res} }

sub json {
	my ($self, $vars) = @_;
	my $body = JSON::XS->new->ascii(1)->encode($vars);
	$self->res->content_type('application/json; charset=utf8');
	$self->res->content($body);
	$self->res->finalize;
}

\&Buspub::run;

[[perl]] [[http]] [[webhook]] [[messagequeue]]

login