Asked  7 Months ago    Answers:  5   Viewed   32 times

I am using pcntl in order to speed up a quite heave CLI php script, that consists mostly of a class, that is in charge of sending all of the auto-emailing on my application.

My goal is as following: I want to assign each process to a certain task, within a foreach loop, the implementation I've used is the one shown in the code example below.

The problem is that once you fork a process, it executes asynchronously, and also gets a copy of the parent's process stack. In my case, what happens is that one task simply executes several times, My question is how can I design this script to be smarter in order to avoid such behavior?.

Code:

/**
@description this is the main procedure of this class, it iteratates over the relevant tasks and sends the emails using the SendGrid wrapper class
@see SendGridWrapper
@return void
*/
public function execute(){
    if(!isset($this->tasks)){
        throw new exception("Please call getRelevantTasks() prior to trying to execute anything");
    }
$proccesses = array();
foreach($this->tasks as $myTask){
    $pid = pcntl_fork();
    if($pid){
        $proccesses[] = $pid;
    }
    else if($pid == -1){
        die('FORK FAILED, STATUS -1');
    }
    else{   
        if(isset($myTask['recipient_model'])){

            $this->currentModel = $myTask['recipient_model'];
            $lang = $myTask['lang'];
            $classPath = self::$modelsDir . $myTask['recipient_model'] . '.php';
            $className = $myTask['recipient_model'];
            if(!class_exists($myTask['recipient_model'] )){
                require_once(dirname(__FILE__) . '/../' .  $classPath); 
            }
            else if(isset($recipientFetcher)){
                unset($this->model);
                unset($this->mailingList);
                unset($this->substitutionList);
            }
            $this->model = null;
            $this->mailingList = null;
            $this->substitutionList = null;
            $this->model = new $className($myTask['lang']);
            $addresses = $this->model->getMailRecipients();
            if(empty($addresses) || sizeof($addresses) == 0){
                continue;
            }
            $this->model->prepare();
            $this->substitutionList = $this->model->getDynamicParams();

        }
        else{
            throw new exception('No recipient model was found');
        }

        foreach($addresses as $myMail){
            $this->mailingList[$myMail['personal_email']] = $myMail['contact_name'];
        }
        $templatePath = dirname(__FILE__) . '/../'; 
        $templatePath .= $lang ? self::$templatesDirEn . $myTask['html_email_path'] : self::$templatesDirHe . $myTask['html_email_path'];
        $html = file_get_contents($templatePath);
        $this->sendMail($html, $myTask['task_schedule_id']);
        echo "model:" . $myTask['recipient_model'];
        echo $this->log;
        $this->log = "";
        die("rn Child proccess has been executed successfullyrn");  

    }   
}
if($pid){
    foreach($proccesses as $key => $val){
         pcntl_waitpid($val, $status, WUNTRACED);
    }
}           

}

Thanks in advance, Oleg.

 Answers

77

Introduction

I see you are trying to send mails $this->sendMail($html, $myTask['task_schedule_id']); and I think it's a really bad idea trying to use multiple process for this task. You should consider using message queue for this task because emails can be very slow.

Use a Queue System

You should be using Gearman, ZeroMQ or Beanstalkd for this task. Worst case scenario use Implement your own simple message queue with memcached.

Here is a typical Gearman Example: https://stackoverflow.com/questions/13855907/when-to-send-auto-email-instantly-on-button-click-or-later

Quick Fix

Remove all those code and put it in a function called execute_worker where you can push the task to it

// Break Task to groups
$tasks = array_chunk(range("A", "Z"), 10);
foreach($tasks as $task) {
    $pid = pcntl_fork();
    if ($pid == - 1) {
        throw new ErrorException('FORK FAILED, STATUS -1');
        break;
    }

    if ($pid == 0) {
        execute_worker($task); // In Child
        exit(); // In Child
    }
}

Using Threads

You can also use Worker or Thread in PHP with pThreads to speed up processing.

  • An easy to use, quick to learn Threading API for PHP5.3+
  • Execute any and all predefined and user declared methods and functions asynchronously
  • Ready made synchronization included, geared towards the PHP environment
  • Yes! Windows support

Simple Project

file_get_contents is said to be slow when compared with curl and no where close to the power of curl_multi_init which allows the processing of multiple cURL handles in parallel.

See:

  • php get all the images from url which width and height >=200 more quicker
  • php - Fastest way to check presence of text in many domains (above 1000)

Our Objective would be to implement our own Multi file_get_contents version

Multi file_get_contents Example

// My Storage
$s = new Storage();

// Threads Storage
$ts = array();

// Total Threads same as total pages
$pages = 100;

// Porpulate Threads (Don't Start Yet)
$i = 0;
while($i ++ < $pages) {
    $ts[] = new Process($s, $i);
}

// Start the timer
$start = microtime(true);

// Lets start all our Threads
foreach($ts as $t) {
    $t->start();
}

// Wait for all threads to compleate
foreach($ts as $t) {
    $t->join();
}

printf("nnFound %s in %d pages", number_format($s->total), $pages);
printf("nFinished %0.3f sec", microtime(true) - $start);

Output

php a.php
3:01:37: 3548 #START    {"page":1}
3:01:37: 7064 #START    {"page":2}
3:01:37: 10908 #START   {"page":3}
3:01:37: 10424 #START   {"page":4}
3:01:37: 11472 #START   {"page":5}
3:01:37: 3876 #START    {"page":6}
3:01:37: 7276 #START    {"page":7}
3:01:37: 11484 #START   {"page":8}
3:01:37: 932 #START     {"page":9}
3:01:37: 11492 #START   {"page":10}
3:01:37: 11500 #START   {"page":11}
3:01:37: 11508 #START   {"page":12}
3:01:37: 11504 #START   {"page":13}
3:01:37: 11512 #START   {"page":14}
3:01:37: 11516 #START   {"page":15}
3:01:37: 11520 #START   {"page":16}
3:01:37: 11524 #START   {"page":17}
3:01:37: 11528 #START   {"page":18}
3:01:37: 10816 #START   {"page":19}
3:01:37: 7280 #START    {"page":20}
3:01:37: 11556 #START   {"page":21}
3:01:37: 11560 #START   {"page":22}
3:01:37: 11564 #START   {"page":23}
3:01:37: 11612 #START   {"page":24}
3:01:37: 11616 #START   {"page":25}
3:01:37: 11600 #START   {"page":26}
3:01:37: 11608 #START   {"page":27}
3:01:37: 11568 #START   {"page":28}
3:01:37: 11452 #START   {"page":29}
3:01:38: 11624 #START   {"page":30}
3:01:38: 11628 #START   {"page":31}
3:01:38: 11632 #START   {"page":32}
3:01:38: 11636 #START   {"page":33}
3:01:38: 11644 #START   {"page":34}
3:01:38: 11648 #START   {"page":35}
3:01:38: 11652 #START   {"page":36}
3:01:38: 11656 #START   {"page":37}
3:01:38: 11660 #START   {"page":38}
3:01:38: 11664 #START   {"page":39}
3:01:38: 11668 #START   {"page":40}
3:01:38: 11672 #START   {"page":41}
3:01:38: 11676 #START   {"page":42}
3:01:38: 11680 #START   {"page":43}
3:01:38: 11684 #START   {"page":44}
3:01:38: 11688 #START   {"page":45}
3:01:38: 11692 #START   {"page":46}
3:01:38: 11696 #START   {"page":47}
3:01:38: 11700 #START   {"page":48}
3:01:38: 11704 #START   {"page":49}
3:01:38: 11712 #START   {"page":50}
3:01:38: 11708 #START   {"page":51}
3:01:38: 11716 #START   {"page":52}
3:01:38: 11720 #START   {"page":53}
3:01:38: 11724 #START   {"page":54}
3:01:38: 11728 #START   {"page":55}
3:01:38: 11732 #START   {"page":56}
3:01:38: 11736 #START   {"page":57}
3:01:38: 11740 #START   {"page":58}
3:01:38: 11744 #START   {"page":59}
3:01:38: 11748 #START   {"page":60}
3:01:38: 11752 #START   {"page":61}
3:01:38: 11756 #START   {"page":62}
3:01:38: 11760 #START   {"page":63}
3:01:38: 11764 #START   {"page":64}
3:01:38: 11768 #START   {"page":65}
3:01:38: 11772 #START   {"page":66}
3:01:38: 11776 #START   {"page":67}
3:01:38: 11780 #START   {"page":68}
3:01:38: 11784 #START   {"page":69}
3:01:38: 11788 #START   {"page":70}
3:01:38: 11792 #START   {"page":71}
3:01:38: 11796 #START   {"page":72}
3:01:38: 11800 #START   {"page":73}
3:01:38: 11804 #START   {"page":74}
3:01:38: 11808 #START   {"page":75}
3:01:38: 11812 #START   {"page":76}
3:01:38: 11816 #START   {"page":77}
3:01:38: 11820 #START   {"page":78}
3:01:38: 11824 #START   {"page":79}
3:01:38: 11828 #START   {"page":80}
3:01:38: 11832 #START   {"page":81}
3:01:38: 11836 #START   {"page":82}
3:01:38: 11840 #START   {"page":83}
3:01:38: 11844 #START   {"page":84}
3:01:38: 11848 #START   {"page":85}
3:01:38: 11852 #START   {"page":86}
3:01:38: 11856 #START   {"page":87}
3:01:38: 11860 #START   {"page":88}
3:01:38: 11864 #START   {"page":89}
3:01:38: 11868 #START   {"page":90}
3:01:38: 11872 #START   {"page":91}
3:01:38: 11876 #START   {"page":92}
3:01:38: 11880 #START   {"page":93}
3:01:38: 11884 #START   {"page":94}
3:01:38: 11888 #START   {"page":95}
3:01:38: 11892 #START   {"page":96}
3:01:38: 11896 #START   {"page":97}
3:01:38: 11900 #START   {"page":98}
3:01:38: 11904 #START   {"page":99}
3:01:38: 11908 #START   {"page":100}
3:01:38: 11508 #END             {"page":12,"byte":1141,"count":155839}
3:01:38: 10424 #END             {"page":4,"byte":1201,"count":553595}
3:01:38: 11516 #END             {"page":15,"byte":1204,"count":119612}
3:01:38: 3548 #END              {"page":1,"byte":1208,"count":6737525}
3:01:38: 11484 #END             {"page":8,"byte":1160,"count":257021}
3:01:38: 11472 #END             {"page":5,"byte":1175,"count":446411}
3:01:38: 10908 #END             {"page":3,"byte":1222,"count":787301}
3:01:38: 11492 #END             {"page":10,"byte":1175,"count":193958}
3:01:38: 11504 #END             {"page":13,"byte":1130,"count":141450}
3:01:38: 11528 #END             {"page":18,"byte":1102,"count":95511}
3:01:38: 11524 #END             {"page":17,"byte":1147,"count":102727}
3:01:38: 11560 #END             {"page":22,"byte":1111,"count":73536}
3:01:38: 11556 #END             {"page":21,"byte":1101,"count":78097}
3:01:38: 11500 #END             {"page":11,"byte":1201,"count":172820}
3:01:38: 932 #END               {"page":9,"byte":1159,"count":222922}
3:01:38: 11520 #END             {"page":16,"byte":1135,"count":110510}
3:01:38: 7064 #END              {"page":2,"byte":1165,"count":1264444}
3:01:38: 11512 #END             {"page":14,"byte":1123,"count":129721}
3:01:38: 11612 #END             {"page":24,"byte":1115,"count":65012}
3:01:38: 11600 #END             {"page":26,"byte":1134,"count":58928}
3:01:38: 7276 #END              {"page":7,"byte":1189,"count":301469}
3:01:38: 10816 #END             {"page":19,"byte":1120,"count":89609}
3:01:38: 11616 #END             {"page":25,"byte":1052,"count":61793}
3:01:38: 3876 #END              {"page":6,"byte":1188,"count":362101}
3:01:38: 7280 #END              {"page":20,"byte":1079,"count":83632}
3:01:38: 11564 #END             {"page":23,"byte":1076,"count":68909}
3:01:38: 11632 #END             {"page":32,"byte":1095,"count":44013}
3:01:38: 11652 #END             {"page":36,"byte":1042,"count":37185}
3:01:38: 11452 #END             {"page":29,"byte":1097,"count":50532}
3:01:38: 11636 #END             {"page":33,"byte":1097,"count":42148}
3:01:38: 11644 #END             {"page":34,"byte":1124,"count":40236}
3:01:38: 11664 #END             {"page":39,"byte":1078,"count":32792}
3:01:38: 11668 #END             {"page":40,"byte":1017,"count":31487}
3:01:38: 11608 #END             {"page":27,"byte":1117,"count":55561}
3:01:38: 11628 #END             {"page":31,"byte":1076,"count":46133}
3:01:38: 11624 #END             {"page":30,"byte":1111,"count":48265}
3:01:38: 11568 #END             {"page":28,"byte":1076,"count":52851}
3:01:38: 11656 #END             {"page":37,"byte":1068,"count":35590}
3:01:38: 11688 #END             {"page":45,"byte":1062,"count":26060}
3:01:38: 11680 #END             {"page":43,"byte":1081,"count":28013}
3:01:38: 11672 #END             {"page":41,"byte":1086,"count":30320}
3:01:38: 11724 #END             {"page":54,"byte":1060,"count":19900}
3:01:38: 11716 #END             {"page":52,"byte":1069,"count":21079}
3:01:38: 11732 #END             {"page":56,"byte":1038,"count":18748}
3:01:38: 11692 #END             {"page":46,"byte":1033,"count":25230}
3:01:38: 11696 #END             {"page":47,"byte":1098,"count":24469}
3:01:38: 11728 #END             {"page":55,"byte":1003,"count":19353}
3:01:38: 11648 #END             {"page":35,"byte":1105,"count":38651}
3:01:38: 11660 #END             {"page":38,"byte":1075,"count":34037}
3:01:38: 11700 #END             {"page":48,"byte":1059,"count":23725}
3:01:39: 11720 #END             {"page":53,"byte":1028,"count":20463}
3:01:39: 11704 #END             {"page":49,"byte":1006,"count":22966}
3:01:39: 11712 #END             {"page":50,"byte":988,"count":22369}
3:01:39: 11676 #END             {"page":42,"byte":1113,"count":29144}
3:01:39: 11748 #END             {"page":60,"byte":1054,"count":17002}
3:01:39: 11684 #END             {"page":44,"byte":1041,"count":26999}
3:01:39: 11756 #END             {"page":62,"byte":1024,"count":16165}
3:01:39: 11760 #END             {"page":63,"byte":1036,"count":15814}
3:01:39: 11740 #END             {"page":58,"byte":1075,"count":17833}
3:01:39: 11736 #END             {"page":57,"byte":1064,"count":18293}
3:01:39: 11752 #END             {"page":61,"byte":1077,"count":16607}
3:01:39: 11708 #END             {"page":51,"byte":1045,"count":21668}
3:01:39: 11768 #END             {"page":65,"byte":1041,"count":15021}
3:01:39: 11764 #END             {"page":64,"byte":1063,"count":15405}
3:01:39: 11744 #END             {"page":59,"byte":1052,"count":17394}
3:01:39: 11800 #END             {"page":73,"byte":1025,"count":12361}
3:01:39: 11792 #END             {"page":71,"byte":1053,"count":13051}
3:01:39: 11796 #END             {"page":72,"byte":1092,"count":12721}
3:01:39: 11784 #END             {"page":69,"byte":1031,"count":13677}
3:01:39: 11780 #END             {"page":68,"byte":1019,"count":13967}
3:01:39: 11772 #END             {"page":66,"byte":1068,"count":14644}
3:01:39: 11816 #END             {"page":77,"byte":1045,"count":11185}
3:01:39: 11804 #END             {"page":74,"byte":1062,"count":12071}
3:01:39: 11824 #END             {"page":79,"byte":1047,"count":10719}
3:01:39: 11820 #END             {"page":78,"byte":1035,"count":10940}
3:01:39: 11788 #END             {"page":70,"byte":987,"count":13354}
3:01:39: 11776 #END             {"page":67,"byte":1036,"count":14278}
3:01:39: 11828 #END             {"page":80,"byte":1013,"count":10519}
3:01:39: 11832 #END             {"page":81,"byte":1052,"count":10318}
3:01:39: 11812 #END             {"page":76,"byte":991,"count":11465}
3:01:39: 11808 #END             {"page":75,"byte":1043,"count":11769}
3:01:39: 11860 #END             {"page":88,"byte":1018,"count":8991}
3:01:39: 11852 #END             {"page":86,"byte":971,"count":9362}
3:01:39: 11868 #END             {"page":90,"byte":1006,"count":8641}
3:01:39: 11840 #END             {"page":83,"byte":1026,"count":9922}
3:01:39: 11872 #END             {"page":91,"byte":980,"count":8464}
3:01:39: 11892 #END             {"page":96,"byte":936,"count":7727}
3:01:39: 11836 #END             {"page":82,"byte":1052,"count":10117}
3:01:39: 11844 #END             {"page":84,"byte":973,"count":9739}
3:01:39: 11864 #END             {"page":89,"byte":1033,"count":8821}
3:01:39: 11856 #END             {"page":87,"byte":994,"count":9169}
3:01:39: 11848 #END             {"page":85,"byte":1040,"count":9544}
3:01:39: 11896 #END             {"page":97,"byte":988,"count":7562}
3:01:39: 11876 #END             {"page":92,"byte":1003,"count":8294}
3:01:39: 11888 #END             {"page":95,"byte":995,"count":7860}
3:01:39: 11880 #END             {"page":93,"byte":1052,"count":8143}
3:01:39: 11900 #END             {"page":98,"byte":977,"count":7418}
3:01:39: 11904 #END             {"page":99,"byte":999,"count":7270}
3:01:39: 11884 #END             {"page":94,"byte":931,"count":8002}
3:01:39: 11908 #END             {"page":100,"byte":977,"count":7144}


Found 14,075,927 in 100 pages
Finished 1.489 sec

Time Taken

Found 14,075,927 in 100 pages
Finished 1.489 sec

Classes Used

class Process extends Thread {

    public function __construct($storage, $page) {
        $this->storage = $storage;
        $this->page = $page;
        // $this->start();
    }

    public function run() {
        $format = "%s: %1u %st%sn";
        $formatTime = "g:i:s";
        $sleep = mt_rand(0, 1); // Just for Demo

        printf($format, date($formatTime), $this->getThreadId(), "#START", "{"page":$this->page}");

        // Do something useful
        $data = file_get_contents(sprintf("http://api.stackoverflow.com/1.1/tags?pagesize=100&page=%s", $this->page));

        // Decode the Data from API
        $json = json_decode(gzdecode($data));

        // Lets Build A profile
        $profile = array();
        $profile['page'] = $this->page;
        $profile['byte'] = strlen($data);

        // Do more work
        $profile['count'] = array_sum(array_map(function ($v) {
            return $v->count;
        }, $json->tags));

        $this->storage->total = bcadd($this->storage->total, $profile['count']);
        // Print Information
        printf($format, date($formatTime), $this->getThreadId(), "#ENDt", json_encode($profile));
    }
}

class Storage extends Stackable {
    public $total = 0;

    public function run() {
    }
}

Conclusion

Did file_get_contents just get 100 pages in just 1.489 sec with my crappy connection. Yes it did. Tested the same code on my live server and It took me less than 0.939 sec to fetch 200 pages.

Your application can be faster in so many ways you just have to use the right technology at the right place.

Wednesday, March 31, 2021
 
Dev
answered 7 Months ago
Dev
20

Just use pcntl_wait(). There's no need to see if the child process has finished, just call pcntl_wait() and block until the child is reaped.

Avoid passing WNOHANG so that your parent process will sit and wait for a child to finish.

You can replace the reaping code you've written (starting at line 58 '$q=true;' of your example) with this:

$status = null;

do {
    // You can use $status with pcntl_wifexited(), pcntl_wifstopped(),
    // pcntl_wifsignaled(), pcntl_wexitstatus(), pcntl_wtermsig() and
    // pcntl_wstopsig() if you need to.

    $pid = pcntl_wait($status);

} while ($pid > 0);
Saturday, May 29, 2021
 
rlanvin
answered 5 Months ago
87

Try like this:

$data = array('current_login' => date('Y-m-d H:i:s'));
$this->db->set('last_login', 'current_login', false);
$this->db->where('id', 'some_id');
$this->db->update('login_table', $data);

Pay particular attention to the set() call's 3rd parameter. false prevents CodeIgniter from quoting the 2nd parameter -- this allows the value to be treated as a table column and not a string value. For any data that doesn't need to special treatment, you can lump all of those declarations into the $data array.

The query generated by above code:

UPDATE `login_table`
SET last_login = current_login, `current_login` = '2018-01-18 15:24:13'
WHERE `id` = 'some_id'
Saturday, May 29, 2021
 
The_Perfect_Username
answered 5 Months ago
83

You can stop and restart the container and it will be re-sent.

With the upcoming 1.1 release, you can seek to the required offset and it will be resent.

But you will still see later messages first if they have already been retrieved so you will have to discard those too.

The second milestone has that feature and we expect it to be released next week.

Wednesday, August 18, 2021
 
Sanguine
answered 2 Months ago
55

It seems like I suggest this daily, but have you looked at Gearman? There's even a well documented PECL class.

Gearman is a work queue system. You'd create workers that connect and listen for jobs, and clients that connect and send jobs. The client can either wait for the requested job to be completed, or it can fire it and forget. At your option, workers can even send back status updates, and how far through the process they are.

In other words, you get the benefits of multiple processes or threads, without having to worry about processes and threads. The clients and workers can even be on different machines.

Friday, August 27, 2021
 
adizone
answered 2 Months ago
Only authorized users can answer the question. Please sign in first, or register a free account.
Not the answer you're looking for? Browse other questions tagged :