Beanstalkd消息队列的安装与使用

  • 内容
  • 评论
  • 相关

一、Beanstalkd是什么?

Beanstalkd是一个高性能,轻量级的分布式内存队列

 

二、Beanstalkd特性

1、支持优先级(支持任务插队)
2、延迟(实现定时任务)
3、持久化(定时把内存中的数据刷到binlog日志)
4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

 

三、Beanstalkd核心元素

生产者 -> 管道(tube) -> 任务(job) -> 消费者

Beanstalkd可以创建多个管道,管道里面存了很多任务,消费者从管道中取出任务进行处理。

 

四、任务job状态

delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态

五、安装Beanstalkd

1
http://kr.github.io/beanstalkd/download.html

下载beanstalkd-1.10.tar.gz

1
2
3
> tar -xf beanstalkd-1.10.tar.gz
> cd beanstalkd-1.10
> make

查看beanstalkd参数信息

1
> ./beanstalkd -h

启动beanstalkd

1
> ./beanstalkd -l 127.0.0.1 -p 11300 -b /data/beanstalkd/binlog &

-b表示开启binlog,断电后重启自动恢复任务

 

六、下载Pheanstalk类

首先安装composer

1
2
3
> curl -sS https://getcomposer.org/installer | php
> mv composer.phar /usr/local/bin/composer
> composer require pda/pheanstalk

编写一个简单脚本查看信息

1
2
3
4
5
6
7
8
<?php
require './vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$p = new Pheanstalk('127.0.0.1', 11300);
//查看beanstalkd当前的状态信息
var_dump($p->stats());

 

七、Pheanstalk使用方法

维护方法

1
2
3
4
5
6
7
stats() 查看状态方法
listTubes() 目前存在的管道
listTubesWatched() 目前监听的管道
statsTube() 管道的状态
useTube() 指定使用的管道
statsJob() 查看任务的详细信息
peek() 通过任务ID获取任务

生产者方法

1
2
putInTube() 往管道中写入数据
put() 配合useTube()使用

消费者方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
watch() 监听管道,可以同时监听多个管道
ignore() 不监听管道
reserve() 以阻塞方式监听管道,获取任务
reserveFromTube()
release() 把任务重新放回管道
bury() 把任务预留
peekBuried() 把预留任务读取出来
kickJob() 把buried状态的任务设置成ready
kick() 批量把buried状态的任务设置成ready
peekReady() 把准备好的任务读取出来
peekDelayed() 把延迟的任务读取出来
pauseTube() 给管道设置延迟
resumeTube() 取消管道延迟
touch() 让任务重新计算ttr时间,给任务续命

生产者producer.php代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php
require './vendor/autoload.php';
use Pheanstalk\Pheanstalk;
//创建一个Pheanstalk对象
$p = new Pheanstalk('192.168.1.222', 11300);
$data = array(
    'id' => 1,
    'name' => 'test',
);
//向userReg管道中添加任务,返回任务ID
//put()方法有四个参数
//第一个任务的数据
//第二个任务的优先级,值越小,越先处理
//第三个任务的延迟
//第四个任务的ttr超时时间
$id = $p->useTube('userReg')->put(json_encode($data));
//获取任务
$job = $p->peek($id);
//查看任务状态
print_r($p->statsJob($job));

消费者consumer.php代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?php
require './vendor/autoload.php';
use Pheanstalk\Pheanstalk;
//创建一个Pheanstalk对象
$p = new Pheanstalk('192.168.1.222', 11300);
//监听userReg管道,忽略default管道
$job = $p->watch('userReg')->ignore('default')->reserve();
$data = json_decode($job->getData());
//打印任务中的数据
print_r($data);
//最后删除任务,表示任务处理完成
$p->delete($job);

 

评论

0条评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注