2017-09-06 16 views
0

저는 시간이 많이 걸리는 작업을 별도의 프로세스로 추출하려고합니다. 불행히도 멀티 스레딩은 PHP에서는 옵션이 아닌 것 같지만 popen을 사용하여 새로운 PHP 프로세스를 만들 수 있습니다.PHP popen 프로세스 제한?

유스 케이스는 다음과 같습니다. 매분마다 실행되는 cronjob이 있으며, 전송해야하는 이메일 캠페인이 있는지 확인합니다. 정확한 시간에 여러 캠페인을 보내야 할 수도 있지만, 지금은 매분마다 하나의 캠페인 만 선택합니다. 별도의 프로세스로 캠페인을 보내고 싶습니다. 동시에 여러 캠페인을 보낼 수 있습니다.

crontab을

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1 

maintask.php

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo "Spawn process {$processName}" . PHP_EOL; 

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r"); 
    stream_set_blocking($process, false); 
} 

하위 :

코드는 다음과 같이 (이 개념의 단지 증거주의) 보이는 .php

$process = $_GET['process_name']; 

echo "Started sleeping process {$process}" . PHP_EOL; 
sleep(rand(10, 40)); 
echo "Stopped sleeping process {$process}" . PHP_EOL; 

이제는 popen이 4를 생성하려고 시도하는 동안 popen은 언제든지 2 개의 프로세스 만 생성한다는 문제가 있습니다. 이유는 알 수 없습니다. 문서화 된 한계가없는 것 같습니다. 아마 이것은 내가 사용할 수있는 코어의 양에 의해 제한됩니까?

+0

[proc_open()] (http://php.net/manual/en/function.proc-open.php) 이전에는 성공적으로 만들 수 있습니다 여러 자식 프로세스. 문제가 해결 되었습니까? –

+0

코드 세그먼트를 파헤쳐 볼 수 있습니까? 코드가 동시에 2 개의 프로세스 만 실행한다는 것을 어떻게 알 수 있습니까? 게시물의 코드로는이를 테스트 할 수 없습니다. –

+0

@WeeZel 불행히도 아직이 문제를 해결할 수 없었습니다. "ps -aux"를 사용하여 활성 프로세스를 모니터링하여 실행중인 프로세스가 2 개뿐인지 확인했습니다. subtask.php 프로세스 중 하나가 완료 되 자마자 새로운 것이 발생합니다. 제공된 코드에 대해 알고 있다고 생각합니다. 단순히 너무 복잡하게 만드는 모든 실제 코드를 게시하지 않고 단순히 달성하려는 것을 보여주기위한 시도였습니다. 내 문제의 작업 시연을하고 원래 게시물을 업데이트 할 수 있는지 알게 될 것입니다. 감사! –

답변

1

각 작업이 언제 시작되고 끝날 지 그리고 얼마나 오래 기다려야하는지 알 수 있도록 subtask.php을 수정했습니다. 프로세스가 시작되면 이제 볼 수 있습니다/당신이 수면 시간을 줄일 수 있습니다 중지 - 프로세스가 실행 중일 때 보여 ps -aux를 사용할 필요가 없습니다

subtask.php

<?php 
$process = $argv[1]; 

$sleepTime = rand(1, 10); 
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL; 
sleep($sleepTime); 
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL; 

내가 추가 한 당신이 maxProcesses을 설정 한 것보다 queue() 개 항목 (시도 32) 때 당신이 그것을 테스트 할 수 있도록 maintask.php 코드에 클래스 ... 재미가 시작
참고 : 결과가 다시 순서대로 올 것이다 그들은

를 완료

maintask.php 내가 사용했습니다

<?php 
class ParallelProcess 
{ 
    private $maxProcesses = 16; // maximum processes 
    private $arrProcessQueue = []; 
    private $arrCommandQueue = []; 

    private function __construct() 
    { 
    } 

    private function __clone() 
    { 
    } 

    /** 
    * 
    * @return \static 
    */ 
    public static function create() 
    { 
     $result = new static(); 
     return $result; 
    } 

    /** 
    * 
    * @param int $maxProcesses 
    * @return \static 
    */ 
    public static function load($maxProcesses = 16) 
    { 
     $result = self::create(); 
     $result->setMaxProcesses($maxProcesses); 
     return $result; 
    } 

    /** 
    * get maximum processes 
    * 
    * @return int 
    */ 
    public function getMaxProcesses() 
    { 
     return $this->maxProcesses; 
    } 

    /** 
    * set maximum processes 
    * 
    * @param int $maxProcesses 
    * @return $this 
    */ 
    public function setMaxProcesses($maxProcesses) 
    { 
     $this->maxProcesses = $maxProcesses; 
     return $this; 
    } 

    /** 
    * number of entries in the process queue 
    * 
    * @return int 
    */ 
    public function processQueueLength() 
    { 
     $result = count($this->arrProcessQueue); 
     return $result; 
    } 

    /** 
    * number of entries in the command queue 
    * 
    * @return int 
    */ 
    public function commandQueueLength() 
    { 
     $result = count($this->arrCommandQueue); 
     return $result; 
    } 


    /** 
    * process open 
    * 
    * @staticvar array $arrDescriptorspec 
    * @param string $strCommand 
    * @return $this 
    * @throws \Exception 
    */ 
    private function p_open($strCommand) 
    { 
     static $arrDescriptorSpec = array(
      0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from 
      1 => array('pipe', 'w'), // stdout is a pipe that the child will write to 
      2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to 
     ); 

     $arrPipes = array(); 
     if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) { 
      throw new \Exception("error: proc_open() failed!"); 
     } 

     $resStream = &$arrPipes[1]; 

     if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) { 
      throw new \Exception("error: stream_set_blocking() failed!"); 
     } 

     $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream); 
     return $this; 
    } 

    /** 
    * execute any queued commands 
    * 
    * @return $this 
    */ 
    private function executeCommand() 
    { 
     while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) { 
      $strCommand = array_shift($this->arrCommandQueue); 
      $this->p_open($strCommand); 
     } 
     return $this; 
    } 

    /** 
    * process close 
    * 
    * @param array $arrQueueEntry 
    * @return $this 
    */ 
    private function p_close(array $arrQueueEntry) 
    { 
     $resProcess = $arrQueueEntry[1]; 
     $resStream = $arrQueueEntry[2]; 

     fclose($resStream); 

     $this->returnValue = proc_close($resProcess); 

     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * queue command 
    * 
    * @param string $strCommand 
    * @return $this 
    */ 
    public function queue($strCommand) { 
     // put the command on the $arrCommandQueue 
     $this->arrCommandQueue[] = $strCommand; 
     $this->executeCommand(); 
     return $this; 
    } 

    /** 
    * read from stream 
    * 
    * @param resource $resStream 
    * @return string 
    */ 
    private static function readStream($resStream) 
    { 
     $result = ''; 
     while (($line = fgets($resStream)) !== false) { 
      $result .= $line; 
     } 
     return $result; 
    } 

    /** 
    * read a result from the process queue 
    * 
    * @return string|false 
    */ 
    private function readProcessQueue() 
    { 
     $result = false; 
     reset($this->arrProcessQueue); 
     while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) { 
      $arrStatus = proc_get_status($arrQueueEntry[1]); 
      if ($arrStatus['running'] === false) { 
       array_splice($this->arrProcessQueue, $key, 1); 
       $resStream = $arrQueueEntry[2]; 
       $result = self::readStream($resStream); 
       $this->p_close($arrQueueEntry); 
      } 
     } 
     return $result; 
    } 

    /** 
    * get result from process queue 
    * 
    * @return string|false 
    */ 
    public function readNext() 
    { 
     $result = false; 
     if ($this->processQueueLength() === 0) { 
     } else { 
      while ($result === false and $this->processQueueLength() > 0) { 
       $result = $this->readProcessQueue(); 
      } 
     } 
     return $result; 
    } 
} 

set_time_limit(0); // don't timeout 

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes 

for ($i = 0; $i < 4; $i++) { 
    $processName = "Process_{$i}"; 
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL; 
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process 
} 

// loop through process queue 
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued 
    // process response 
    echo $strResponse; 
} 
+0

그것은 훌륭하게 작동합니다! 무리 감사! –