Parallelizing Scripts: Difference between revisions

From GRASS-Wiki
Jump to navigation Jump to search
(→‎Python: Added another (simple) implementation of a parallelization script)
Line 97: Line 97:




This can also be accomplished fairly simply by using tracking the number of workers available and using grass.start_command as long as another job can be performed:
<source lang="python">
import os
import multiprocessing as multi
import grass.script as grass
# Find number of workers that can be used on system. This variable could
# also be set manually.
workers = multi.cpu_count()
# This is only a set of examples for r.slope.aspect jobs where the maps are
# named serially.
jobs = range(20)
# Check if workers are already being used
if workers is 1 and "WORKERS" in os.environ:
    workers = int(os.environ["WORKERS"])
if workers < 1:
    workers = 1
# Initialize process dictionary
proc = {}
# Loop over jobs
for i in range(jobs):
    # Insert job into dictinoary to keep track of it
    proc[i] = grass.start_command('r.slope.aspect',
                                  elevation='elev_' + str(i),
                                  slope='slope_' + str(i))
    # If the workers are used up, wait for all of them from the last group to
    # finish.
    if i % workers is 0:
        for j in range(workers):
            proc.[i - j].wait()
# Make sure all workers are finished.
for i in range(jobs):
    if proc[i].wait() is not 0:
        grass.fatal(_('Problem running analysis on evel_' + str(i) + '.')
</source>


=== GNU Parallel ===
=== GNU Parallel ===

Revision as of 14:48, 7 August 2012

Bourne shell script

  • Poor-man's multithreading using Bourne shell script & backgrounding. WARNING: not all GRASS modules and scripts are safe to have other things happening in the same mapset while they are running. Try at your own risk after performing a suitable safety audit. e.g. Make sure g.region is not run, externally changing the region settings.

Example:

 ### r.sun mode 1 loop ###
 SUNRISE=7.67
 SUNSET=16.33
 STEP=0.01
 # | wc -l   867

 if [ -z "$WORKERS" ] ; then
    WORKERS=4
 fi

 DAY=355
 for TIME in `seq $SUNRISE $STEP $SUNSET` ; do
    echo "time=$TIME"
    CMD="r.sun -s elevin=gauss day=$DAY time=$TIME \
          beam_rad=rad1_test.${DAY}_${TIME}_beam --quiet"
 
    # poor man's multi-threading for a multi-core CPU
    MODULUS=`echo "$TIME $STEP $WORKERS" | awk '{print $1 % ($2 * $3)}'`
    if [ "$MODULUS" = "$STEP" ] || [ "$TIME" = "$SUNSET" ] ; then
       # stall to let the background jobs finish
       $CMD
       sleep 2
       wait
       #while [ `pgrep -c r.sun` -ne 0 ] ; do
       #   sleep 5
       #done
    else
      $CMD &
    fi
 done
 wait   # wait for background jobs to finish to avoid race conditions
  • This approach has been used in the r3.in.xyz addon script.
  • Another example using r.sun Mode 2 can be found on the r.sun wiki page.
  • See the i.landsat.rgb and i.oif examples in 6.5svn.


Backgrounding code which sets environment variables with `eval` requires the use of grouping within ()s:

 eval `(
    r.univar -ge map="$RED"   percentile=95 | grep '^percentile_' | sed -e 's/^/R_/' &
    r.univar -ge map="$GREEN" percentile=95 | grep '^percentile_' | sed -e 's/^/G_/' &
    r.univar -ge map="$BLUE"  percentile=95 | grep '^percentile_' | sed -e 's/^/B_/'
    wait
 )`

Python

  • Due to the "GIL" in Python 2.x-3.0, pure python will only run on a single core, even when multi-threaded. All multithreading schemes & modules for (pure) Python are therefore wrappers around multiple system processes, which are a lot more expensive than threads to create and destroy. Thus it is more efficient to create large high-level Python 'threads' (processes) than to bury them deep inside of a loop.

Example of multiprocessing at the GRASS module level:

Similar to the Bourne shell example above, but using the subprocess python module. The i.oif script in GRASS7 is using this method.

bands = [1,2,3,4,5,7]

# run all bands in parallel
if "WORKERS" in os.environ:
    workers = int(os.environ["WORKERS"])
else:
    workers = 6

proc = {}
pout = {}

# spawn jobs in the background
for band in bands:
    grass.debug("band %d, <%s>  %% %d" % (band, image[band], band % workers))
    proc[band] = grass.pipe_command('r.univar', flags = 'g', map = image[band])
    if band % workers is 0:
	# wait for the ones launched so far to finish
	for bandp in bands[:band]:
	    if not proc[bandp].stdout.closed:
		pout[bandp] = proc[bandp].communicate()[0]
	    proc[bandp].wait()

# wait for jobs to finish, collect the output
for band in bands:
    if not proc[band].stdout.closed:
	pout[band] = proc[band].communicate()[0]
    proc[band].wait()

# parse the results
for band in bands:
    kv = grass.parse_key_val(pout[band])
    stddev[band] = float(kv['stddev'])


This can also be accomplished fairly simply by using tracking the number of workers available and using grass.start_command as long as another job can be performed:

import os
import multiprocessing as multi

import grass.script as grass

# Find number of workers that can be used on system. This variable could 
# also be set manually.
workers = multi.cpu_count()
# This is only a set of examples for r.slope.aspect jobs where the maps are
# named serially.
jobs = range(20)

# Check if workers are already being used
if workers is 1 and "WORKERS" in os.environ:
    workers = int(os.environ["WORKERS"])
if workers < 1:
    workers = 1

# Initialize process dictionary
proc = {}

# Loop over jobs
for i in range(jobs):
    # Insert job into dictinoary to keep track of it
    proc[i] = grass.start_command('r.slope.aspect',
                                  elevation='elev_' + str(i),
                                  slope='slope_' + str(i))
    # If the workers are used up, wait for all of them from the last group to
    # finish.
    if i % workers is 0:
        for j in range(workers):
            proc.[i - j].wait()

# Make sure all workers are finished.
for i in range(jobs):
    if proc[i].wait() is not 0:
        grass.fatal(_('Problem running analysis on evel_' + str(i) + '.')

GNU Parallel

  • GNU Parallel is an advanced version of xargs which makes it easy to write parallel shell scripts.
See also the unrelated C "parallel" program which comes with the Linux "moreutils" package. It is tighter but less featureful than GNU Parallel.
 ### r.sun mode 1 loop ###
 SUNRISE=7.67
 SUNSET=16.33
 STEP=0.01
 # | wc -l   867
 
 DAY=355

 seq $SUNRISE $STEP $SUNSET | parallel -j+0 r.sun -s elevin=gauss day=$DAY \
       time={} beam_rad=rad1_test.${DAY}_{}_beam --quiet

GNU Parallel can also distribute work to other computers, see the video on how:

http://www.youtube.com/watch?v=LlXDtd_pRaY

xargs

  • xargs can be told to limit itself to a certain number of processes at once. The r.sun example is almost exactly as with GNU Parallel, except for `-P $CORES -n 1` instead of `-j+0`.

For example, convert a large number of Raster3D maps into 2D rasters:

  NUM_CORES=6
  g.mlist rast3d | xargs -P $NUM_CORES -n 1 -I{} \
     r3.to.rast -r in={} out={} --quiet

For another example, here we spit apart a PDF and convert each page to a PNG image:

  pdftk pdfmovie.pdf burst
  NUM_CORES=6
  ls -1 pg_*.pdf | xargs -P $NUM_CORES -n 1 -I{} \
     sh -c "pdftoppm {} | pnmcut -width 1280 -height 1024 -left 0 -top 0 | \
               pnmtopng > \`basename {} .pdf\`.png"