NAME

MangoX::Queue - A MongoDB queue implementation using Mango

DESCRIPTION

MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.

MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.

SYNOPSIS

use Mango;
use MangoX::Queue;

my $mango = Mango->new("mongodb://localhost:27017");
my $collection = $mango->db('my_db')->collection('my_queue');

my $queue = MangoX::Queue->new(collection => $collection);

# To add a job
my $id = enqueue $queue 'test'; # Blocking
enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking

# To set options
my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking
enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking

# To watch for a specific job status
watch $queue $id; # Blocking
watch $queue $id, 'Complete' => sub { # Non-blocking
	# Job status is 'Complete'
};

# To fetch a job
my $job = fetch $queue; # Blocking
fetch $queue sub { # Non-blocking
	my ($job) = @_;
	# ...
};

# To get a job by id
my $job = get $queue $id; # Blocking
get $queue $id => sub { my $job = shift; }; # Non-blocking

# To requeue a job
my $id = requeue $queue $job; # Blocking
requeue $queue $job => sub { my $id = shift; }; # Non-blocking

# To dequeue a job
dequeue $queue $id; # Blocking
dequeue $queue $id => sub { }; # Non-blocking

# To consume a queue
while(my $job = consume $queue) { # Blocking
	# ...
}
my $consumer = consume $queue sub { # Non-blocking
	my ($job) = @_;
	# ...
};

# To stop consuming a queue
release $queue $consumer;

# To listen for events
on $queue enqueued => sub ( my ($queue, $job) = @_; };
on $queue dequeued => sub ( my ($queue, $job) = @_; };
on $queue consumed => sub { my ($queue, $job) = @_; };

# To register a plugin
plugin $queue 'MangoX::Queue::Plugin::Statsd';

ATTRIBUTES

MangoX::Queue implements the following attributes.

collection

my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));

my $queue = MangoX::Queue->new(collection => $collection);

The Mango::Collection representing the MongoDB queue collection.

delay

my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);

The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.

plugins

my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

retries

my $retries = $queue->retries;
$queue->retries(5);

The number of times a job will be picked up from the queue before it is marked as failed.

timeout

my $timeout = $queue->timeout;
$queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.

EVENTS

MangoX::Queue inherits from Mojo::EventEmitter and emits the following events

consumed

on $queue consumed => sub {
	my ($queue, $job) = @_;
	# ...
};

Emitted when an item is consumed (either via consume or fetch)

dequeued

on $queue dequeued => sub {
	my ($queue, $job) = @_;
	# ...
};

Emitted when an item is dequeued

enqueued

on $queue enqueued => sub {
	my ($queue, $job) = @_;
	# ...
};

Emitted when an item is enqueued

METHODS

MangoX::Queue implements the following methods.

consume

# In blocking mode
while(my $job = consume $queue) {
	# ...
}
while(my $job = $queue->consume) {
	# ...
}

# In non-blocking mode
consume $queue sub {
	my ($job) = @_;
	# ...
};
$queue->consume(sub {
	my ($job) = @_;
	# ...
});

Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.

Currently sets the status to 'Retrieved' before returning the job.

dequeue

my $job = fetch $queue;
dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

enqueue

enqueue $queue 'job name';
enqueue $queue [ 'some', 'data' ];
enqueue $queue +{ foo => 'bar' };

$queue->enqueue('job name');
$queue->enqueue([ 'some', 'data' ]);
$queue->enqueue({ foo => 'bar' });

Add an item to the queue.

Currently uses priority 1 with a job status of 'Pending'.

fetch

# In blocking mode
my $job = fetch $queue;
my $job = $queue->fetch;

# In non-blocking mode
fetch $queue sub {
	my ($job) = @_;
	# ...
};
$queue->fetch(sub {
	my ($job) = @_;
	# ...
});

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

get

my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

get_options

my $options = $queue->get_options;

Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.

Wait for a job to enter a certain status.

release

my $consumer = consume $queue sub {
	# ...
};
release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

requeue

my $job = fetch $queue;
requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

update

my $job = fetch $queue;
$job->{status} = 'Failed';
update $queue $job;

Updates a job in the queue.

watch

# In blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete'; # blocks until job is complete

# In non-blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete' => sub {
	# ...
};

SEE ALSO

Mojolicious, Mango