由于PHP不支持多線程,但是作為一個完善的系統(tǒng),有很多操作都是需要異步完成的。為了完成這些異步操作,我們做了一個基于Redis隊列任務(wù)系統(tǒng)。
大家知道,一個消息隊列處理系統(tǒng)主要分為兩大部分:消費者和制造者。
在我們的系統(tǒng)中,主系統(tǒng)作為制造者,任務(wù)系統(tǒng)作為消費者。
具體的工作流程如下: 1、主系統(tǒng)將需要需要處理的任務(wù)名稱+任務(wù)參數(shù)push到隊列中。 2、任務(wù)系統(tǒng)實時的對任務(wù)隊列進行pop,pop出來一個任務(wù)就fork一個子進程,由子進程完成具體的任務(wù)邏輯。
/** * 啟動守護進程 */ public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit; } /** * 創(chuàng)建子進程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子進程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主進程 $pid = pcntl_wait($status, WUNTRACED); //取得子進程結(jié)束狀態(tài) if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } } } /** * 業(yè)務(wù)任務(wù)隊列處理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE; }
+VX:PHPopen888PHP中高級教程分享,專注laravel,swoole,tp,分布式高并發(fā)處理教程分享