Asked  6 Months ago    Answers:  5   Viewed   127 times

In the example code below, I'd like to recover the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can't seem to find the relevant attribute in the objects stored in jobs.

 Answers

53

Use shared variable to communicate. For example like this:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())
Tuesday, June 1, 2021
 
capsid
answered 6 Months ago
21

I don't think you can achieve what you want in this way. The anonymous function is invoked by the chunk method, so anything you return from your closure is being swallowed by chunk. Since chunk potentially invokes this anonymous function N times, it makes no sense for it to return anything back from the closures it invokes.

However you can provide access to a method-scoped variable to the closure, and allow the closure to write to that value, which will let you indirectly return results. You do this with the use keyword, and make sure to pass the method-scoped variable in by reference, which is achieved with the & modifier.

This will work for example;

$count = 0;
DB::table('users')->chunk(200, function($users) use (&$count)
{
    Log::debug(count($users)); // will log the current iterations count
    $count = $count + count($users); // will write the total count to our method var
});
Log::debug($count); // will log the total count of records
Saturday, May 29, 2021
 
Teno
answered 7 Months ago
37

Have the parent process wait for the child to exit:

pid_t pid = fork();
if (pid == -1) {
  // error, no child created
}
else if (pid == 0) {
  // child
}
else {
  // parent
  int status;
  if (waitpid(pid, &status, 0) == -1) {
    // handle error
  }
  else {
    // child exit code in status
    // use WIFEXITED, WEXITSTATUS, etc. on status
  }
}
Friday, October 8, 2021
 
Palladium
answered 2 Months ago
72

You should define your work function before declaring the Pool, when you declaring Pool, sub worker processes forked from that point, worker process don't execute code beyond that line, therefore not seeing your work function.

Besides, you'd better replace pool.map with pool.starmap to fit your input.

A simplified example:

from multiprocessing import Pool

def co_refresh(a, b, c, d):
    print(a, b, c, d)

input_list = [f'a{i} b{i} c{i} d{i}'.split() for i in range(4)]
# [['a0', 'b0', 'c0', 'd0'], ['a1', 'b1', 'c1', 'd1'], ['a2', 'b2', 'c2', 'd2'], ['a3', 'b3', 'c3', 'd3']]

pool = Pool(processes=3)
pool.starmap(co_refresh, input_list)
pool.close()
Saturday, October 9, 2021
 
shrini1000
answered 2 Months ago
61

In theory and based on the current implementation of SharedMemory, the warnings should be expected. The main reason is that every shared memory object you have created is being tracked twice: first, when it's produced by one of the processes in the Pool object; and second, when it's consumed by the main process. This is mainly because the current implementation of the constructor of SharedMemory will register the shared memory object regardless of whether the createargument is set to True or its value is False.

So, when you call shm.unlink() in the main process, what you are doing is deleting the shared memory object entirely before its producer (some process in the Pool) gets around to cleaning it up. As a result, when the pool gets destroyed, each of its members (if they ever got a task) has to clean up after itself. The first warning about leaked resources probably refers to the shared memory objects actually created by processes in the Pool that never got unlinked by those same processes. And, the No such file or directory warnings are due to the fact that the main process has unlinked the files associated with the shared memory objects before the processes in the Pool are destroyed.

The solution provided in the linked bug report would likely prevent consuming processes from having to spawn additional resource trackers, but it does not quite prevent the issue that arises when a consuming process decides to delete a shared memory object that it did not create. This is because the process that produced the shared memory object will still have to do some clean up, i.e. some unlinking, before it exits or is destroyed.

The fact that you are not seeing those warnings is quite puzzling. But it may well have to do with a combination of OS scheduling, unflushed buffers in the child process and the start method used when creating a process pool.

For comparison, when I use fork as a start method on my machine, I get the warnings. Otherwise, I see no warnings when spawn and forkserver are used. I added argument parsing to your code to make it easy to test different start methods:

#!/usr/bin/env python3
# shm_test_script.py
"""
Use --start_method or -s to pick a process start method when creating a process Pool.
Use --tasks or -t to control how many shared memory objects should be created.
Use --pool_size or -p to control the number of child processes in the create pool.
"""
import argparse
import multiprocessing
import multiprocessing.shared_memory as shared_memory


def create_shm():
    shm = shared_memory.SharedMemory(create=True, size=30000000)
    shm.close()
    return shm.name


def main(tasks, start_method, pool_size):
    multiprocessing.set_start_method(start_method, force=True)
    pool = multiprocessing.Pool(processes=pool_size)
    tasks = [pool.apply_async(create_shm) for _ in range(tasks)]

    for task in tasks:
        name = task.get()
        print('Getting {}'.format(name))
        shm = shared_memory.SharedMemory(name=name, create=False)
        shm.close()
        shm.unlink()
    pool.terminate()
    pool.join()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        '--start_method', '-s',
        help='The multiproccessing start method to use. Default: %(default)s',
        default=multiprocessing.get_start_method(),
        choices=multiprocessing.get_all_start_methods()
    )
    parser.add_argument(
        '--pool_size', '-p',
        help='The number of processes in the pool. Default: %(default)s',
        type=int,
        default=multiprocessing.cpu_count()
    )
    parser.add_argument(
        '--tasks', '-t',
        help='Number of shared memory objects to create. Default: %(default)s',
        default=200,
        type=int
    )
    args = parser.parse_args()
    main(args.tasks, args.start_method, args.pool_size)

Given that fork is the only method that ends up displaying the warnings (for me, at least), maybe there is actually something to the following statement about it:

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

It's not surprising that the warnings from child processes persist/propagate if all resources of the parent are inherited by the child processes.

If you're feeling particularly adventurous, you can edit the multiprocessing/resource_tracker.py and update warnings.warn lines by adding os.getpid() to the printed strings. For instance, changing any warning with "resource_tracker:" to "resource_tracker %d: " % (os.getpid()) should be sufficient. If you've done this, you will notice that the warnings come from various processes that are neither the child processes, nor the main process itself.

With those changes made, the following should help with double checking that the complaining resource trackers are as many as your Pool size, and their process IDs are different from the main process or the child processes:

chmod +x shm_test_script.py
./shm_test_script.py -p 10 -t 50 -s fork > log 2> err
awk -F ':' 'length($4) > 1 { print $4 }' err | sort | uniq -c

That should display ten lines, each of which prepended with the number of complaints from the corresponding resource tracker. Every line should also contain a PID that should be different from the main and child processes.

To recap, each child process should have its own resource tracker if it receives any work. Since you're not explicitly unlinking the shared memory objects in the child processes, the resources will likely get cleaned up when the child processes are destroyed.

I hope this helps answer some, if not all, of your questions.

Monday, November 1, 2021
 
Asperi
answered 4 Weeks ago
Only authorized users can answer the question. Please sign in first, or register a free account.
Not the answer you're looking for? Browse other questions tagged :  
Share