1. 消息订阅
首先,需要使用phpredis扩展来与 Redis 进行交互。phpredis 提供了许多 Redis 命令的封装,方便了 PHP 对 Redis 的操作。
在订阅 Redis 消息前,需要先建立与 Redis 的连接:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
?>
接着,通过以下代码进行订阅操作:
$queue = 'report'; // 订阅频道名
$redis->subscribe([$queue], function ($redis, $channel, $msg) {
// 处理消息
});
?>
注意:subscribe 方法是阻塞式的,会一直监听 Redis 消息,直到调用 unsubscribe 方法才会停止。因此,建议将订阅代码放在一个守护进程中执行,以免阻断主进程。
2. 消息处理
当 Redis 接收到消息时,subscribe 方法传递回调函数中,可以在回调函数中处理消息:
$queue = 'report';
$redis->subscribe([$queue], function ($redis, $channel, $msg) {
$data = json_decode($msg, true);
// 处理消息
});
?>
在这里,将 Redis 接收到的消息解析成数组,并进行进一步操作。
2.1. 写入日志文件
在处理 Redis 消息时,可以将消息写入到日志文件中,用于后续生成报告。下面是一个写入日志文件的示例:
$queue = 'report';
$logFile = '/path/to/log/file.log';
$redis->subscribe([$queue], function ($redis, $channel, $msg) use ($logFile) {
$data = json_decode($msg, true);
// 日志写入
$logMessage = sprintf("Received message at %s:%s - %s\n",
$data['date'], $data['time'], $data['message']);
file_put_contents($logFile, $logMessage, FILE_APPEND);
});
?>
这里使用了 file_put_contents 函数将消息写入到日志文件中。为了避免每次写入日志时都需要打开文件句柄,可以设定 FILE_APPEND 标志,将日志信息追加到文件末尾。
注意:日志文件路径应该是绝对路径,尽量避免写入到程序目录中,否则可能会有安全风险。
2.2. 生成报告数据
可以在处理 Redis 消息时将消息数据缓存起来,并基于缓存数据生成报告。这里使用一个数组 $reports 缓存消息数据,当缓存数组中的数据量达到阈值时,就生成一份报告,清空缓存数组。
$queue = 'report';
$threshold = 100; // 报告阈值
$reports = [];
$redis->subscribe([$queue], function ($redis, $channel, $msg) use ($threshold, &$reports) {
$data = json_decode($msg, true);
$reports[] = $data;
// 达到阈值,生成报告
if (count($reports) >= $threshold) {
generateReport($reports);
$reports = [];
}
});
function generateReport($data) {
// 报告生成逻辑
}
?>
在上面的代码中,使用了阈值 $threshold 参数来控制报告的生成频率,当缓存数组 $reports 中的数据量达到 $threshold 时,就调用 generateReport 方法生成报告,并清空缓存数组。generateReport 函数需要根据 $data 数组生成报告数据,然后将报告数据写入到 Redis 的另一个频道中,供其他应用程序使用。
3. 报告生成
报告生成逻辑根据具体需求而定,这里我们假定需要生成一份按小时统计的报告,包含每个小时内收到的消息数量。下面的代码演示了如何按小时统计消息数量:
function generateReport($data) {
// 消息按小时归类
$hourlyData = [];
foreach ($data as $item) {
$hour = date('Y-m-d H', strtotime($itme['date'] . ' ' . $item['time']));
if (!isset($hourlyData[$hour])) {
$hourlyData[$hour] = 0;
}
$hourlyData[$hour] += 1;
}
// 报告生成
$report = [];
foreach ($hourlyData as $hour => $count) {
$hourlyReport = [
'hour' => $hour,
'count' => $count
];
$report[] = $hourlyReport;
}
// 将报告写入 Redis
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$queue = 'report_hourly';
$jsonReport = json_encode($report);
$redis->publish($queue, $jsonReport);
}
?>
在上面的代码中,使用 $hourlyData 数组将收到的消息按小时归类。然后,使用 $hourlyData 数组生成报告数据,将报告数据写入到 Redis 的 report_hourly 频道中。
4. 结语
上面介绍了如何使用 PHP 实现持续监听 Redis 的消息订阅并生成报告。消息订阅可以使用 Redis 的 subscribe 方法实现,消息处理可以将消息数据写入到日志文件中,并生成报告数据。报告生成逻辑应根据具体需求进行实现,可以根据当前示例代码进行修改和扩展。