消息队列应用场景:
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。
Kafka
Hydra 底层的支持组件
Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务。
应用方法:
需要先了解几个名词:
- 消息, 需要处理的事件内容, 包含消息名字, 消息内容。
- 分区: 为了增加生产效率,可能会建立多个消息分区,分区还可以用来做负载均衡, 创建消息的时候,会指定分区的个数,一旦确定个数,只能扩容,不能缩容。
- 生产者: 消息的生产者, 一个生产者,会产生不同类型的消息,每个生产者发送消息的时候可以指定分区号。
- 消费者: 消息的消费者, 一个消费者,消费一类消息。
包含Hydra SDK
生产者
需要在 res.yml 配置 2个参数:
1) ZK_LIST :
'host1:port1,host2:port2'
zookeeper 服务器的地址
2) KAFKA_PARTIONS: 分区的个数,创建topic的时候已经确定,如果topic的分区扩容了,这个数字相应扩大。
在php.ini 开启zookeeper.so 扩展引用
$r
= Hydra::trigger(
'test1'
,
"hello hydra"
);
// test1: 消息名字 “hello hydra” 消息内容
消费者
$conf
=
new
HydraConf;
$conf
->host =
'192.168.1.40:2181'
;
// 集群地址,若有多个机器,以逗号分割
$conf
->topic =
'test1'
;
// 想消费的消息 名字
$conf
->subscriber =
'dy'
;
// 消费者的名字
function
encode(
$msg
) {
// 处理消息的函数, 使用者的业务逻辑。 这个$msg 就是hydra 消费者 收到的消息。
$t
= time();
$r
=
base64_encode
(
$msg
->getMessage());
if
(
$t
% 10 == 0)
return
false;
else
return
true;
}
/**************** 切记 **************
消息处理成功返回 : return true;
消息处理失败返回: return false;
************************************/
$logger
= XLogKit::logger(
'hydra'
);
$hydraSvc
=
new
HydraSvc(
$conf
,
$logger
);
$hydraSvc
->serv(encode, false);
// 第一个参数,就是上面用户处理业务逻辑的函数名, 第二个参数 默认为true, 表示 如果消息处理失败,就会异常退出。 否则就会忽略这个消息。