Beanstalkd消息队列的安装与使用
一、Beanstalkd是什么?
Beanstalkd是一个高性能,轻量级的分布式内存队列
二、Beanstalkd特性
1、支持优先级(支持任务插队)
2、延迟(实现定时任务)
3、持久化(定时把内存中的数据刷到binlog日志)
4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)
三、Beanstalkd核心元素
生产者 -> 管道(tube) -> 任务(job) -> 消费者
Beanstalkd可以创建多个管道,管道里面存了很多任务,消费者从管道中取出任务进行处理。
四、任务job状态
delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态
五、安装Beanstalkd
1
|
http: //kr.github.io/beanstalkd/download.html |
下载beanstalkd-1.10.tar.gz
1
2
3
|
> tar -xf beanstalkd-1.10.tar.gz > cd beanstalkd-1.10 > make |
查看beanstalkd参数信息
1
|
> ./beanstalkd -h |
启动beanstalkd
1
|
> ./beanstalkd -l 127.0.0.1 -p 11300 -b /data/beanstalkd/binlog & |
-b表示开启binlog,断电后重启自动恢复任务
六、下载Pheanstalk类
首先安装composer
1
2
3
|
> curl -sS https: //getcomposer.org/installer | php > mv composer.phar /usr/local/bin/composer > composer require pda/pheanstalk |
编写一个简单脚本查看信息
1
2
3
4
5
6
7
8
|
<?php require './vendor/autoload.php' ; use Pheanstalk\Pheanstalk; $p = new Pheanstalk( '127.0.0.1' , 11300); //查看beanstalkd当前的状态信息 var_dump( $p ->stats()); |
七、Pheanstalk使用方法
维护方法
1
2
3
4
5
6
7
|
stats() 查看状态方法 listTubes() 目前存在的管道 listTubesWatched() 目前监听的管道 statsTube() 管道的状态 useTube() 指定使用的管道 statsJob() 查看任务的详细信息 peek() 通过任务ID获取任务 |
生产者方法
1
2
|
putInTube() 往管道中写入数据 put() 配合useTube()使用 |
消费者方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
watch() 监听管道,可以同时监听多个管道 ignore() 不监听管道 reserve() 以阻塞方式监听管道,获取任务 reserveFromTube() release() 把任务重新放回管道 bury() 把任务预留 peekBuried() 把预留任务读取出来 kickJob() 把buried状态的任务设置成ready kick() 批量把buried状态的任务设置成ready peekReady() 把准备好的任务读取出来 peekDelayed() 把延迟的任务读取出来 pauseTube() 给管道设置延迟 resumeTube() 取消管道延迟 touch() 让任务重新计算ttr时间,给任务续命 |
生产者producer.php代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<?php require './vendor/autoload.php' ; use Pheanstalk\Pheanstalk; //创建一个Pheanstalk对象 $p = new Pheanstalk( '192.168.1.222' , 11300); $data = array ( 'id' => 1, 'name' => 'test' , ); //向userReg管道中添加任务,返回任务ID //put()方法有四个参数 //第一个任务的数据 //第二个任务的优先级,值越小,越先处理 //第三个任务的延迟 //第四个任务的ttr超时时间 $id = $p ->useTube( 'userReg' )->put(json_encode( $data )); //获取任务 $job = $p ->peek( $id ); //查看任务状态 print_r( $p ->statsJob( $job )); |
消费者consumer.php代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<?php require './vendor/autoload.php' ; use Pheanstalk\Pheanstalk; //创建一个Pheanstalk对象 $p = new Pheanstalk( '192.168.1.222' , 11300); //监听userReg管道,忽略default管道 $job = $p ->watch( 'userReg' )->ignore( 'default' )->reserve(); $data = json_decode( $job ->getData()); //打印任务中的数据 print_r( $data ); //最后删除任务,表示任务处理完成 $p -> delete ( $job ); |
发表回复