libstdc++
|
00001 // -*- C++ -*- 00002 00003 // Copyright (C) 2007, 2008, 2009, 2010 Free Software Foundation, Inc. 00004 // 00005 // This file is part of the GNU ISO C++ Library. This library is free 00006 // software; you can redistribute it and/or modify it under the terms 00007 // of the GNU General Public License as published by the Free Software 00008 // Foundation; either version 3, or (at your option) any later 00009 // version. 00010 00011 // This library is distributed in the hope that it will be useful, but 00012 // WITHOUT ANY WARRANTY; without even the implied warranty of 00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00014 // General Public License for more details. 00015 00016 // Under Section 7 of GPL version 3, you are granted additional 00017 // permissions described in the GCC Runtime Library Exception, version 00018 // 3.1, as published by the Free Software Foundation. 00019 00020 // You should have received a copy of the GNU General Public License and 00021 // a copy of the GCC Runtime Library Exception along with this program; 00022 // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see 00023 // <http://www.gnu.org/licenses/>. 00024 00025 /** @file parallel/workstealing.h 00026 * @brief Parallelization of embarrassingly parallel execution by 00027 * means of work-stealing. 00028 * 00029 * Work stealing is described in 00030 * 00031 * R. D. Blumofe and C. E. Leiserson. 00032 * Scheduling multithreaded computations by work stealing. 00033 * Journal of the ACM, 46(5):720–748, 1999. 00034 * 00035 * This file is a GNU parallel extension to the Standard C++ Library. 00036 */ 00037 00038 // Written by Felix Putze. 00039 00040 #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H 00041 #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1 00042 00043 #include <parallel/parallel.h> 00044 #include <parallel/random_number.h> 00045 #include <parallel/compatibility.h> 00046 00047 namespace __gnu_parallel 00048 { 00049 00050 #define _GLIBCXX_JOB_VOLATILE volatile 00051 00052 /** @brief One __job for a certain thread. */ 00053 template<typename _DifferenceTp> 00054 struct _Job 00055 { 00056 typedef _DifferenceTp _DifferenceType; 00057 00058 /** @brief First element. 00059 * 00060 * Changed by owning and stealing thread. By stealing thread, 00061 * always incremented. */ 00062 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first; 00063 00064 /** @brief Last element. 00065 * 00066 * Changed by owning thread only. */ 00067 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last; 00068 00069 /** @brief Number of elements, i.e. @c _M_last-_M_first+1. 00070 * 00071 * Changed by owning thread only. */ 00072 _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; 00073 }; 00074 00075 /** @brief Work stealing algorithm for random access iterators. 00076 * 00077 * Uses O(1) additional memory. Synchronization at job lists is 00078 * done with atomic operations. 00079 * @param __begin Begin iterator of element sequence. 00080 * @param __end End iterator of element sequence. 00081 * @param __op User-supplied functor (comparator, predicate, adding 00082 * functor, ...). 00083 * @param __f Functor to @a process an element with __op (depends on 00084 * desired functionality, e. g. for std::for_each(), ...). 00085 * @param __r Functor to @a add a single __result to the already 00086 * processed elements (depends on functionality). 00087 * @param __base Base value for reduction. 00088 * @param __output Pointer to position where final result is written to 00089 * @param __bound Maximum number of elements processed (e. g. for 00090 * std::count_n()). 00091 * @return User-supplied functor (that may contain a part of the result). 00092 */ 00093 template<typename _RAIter, 00094 typename _Op, 00095 typename _Fu, 00096 typename _Red, 00097 typename _Result> 00098 _Op 00099 __for_each_template_random_access_workstealing(_RAIter __begin, 00100 _RAIter __end, _Op __op, 00101 _Fu& __f, _Red __r, 00102 _Result __base, 00103 _Result& __output, 00104 typename std::iterator_traits<_RAIter>::difference_type __bound) 00105 { 00106 _GLIBCXX_CALL(__end - __begin) 00107 00108 typedef std::iterator_traits<_RAIter> _TraitsType; 00109 typedef typename _TraitsType::difference_type _DifferenceType; 00110 00111 const _Settings& __s = _Settings::get(); 00112 00113 _DifferenceType __chunk_size = 00114 static_cast<_DifferenceType>(__s.workstealing_chunk_size); 00115 00116 // How many jobs? 00117 _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound; 00118 00119 // To avoid false sharing in a cache line. 00120 const int __stride = (__s.cache_line_size * 10 00121 / sizeof(_Job<_DifferenceType>) + 1); 00122 00123 // Total number of threads currently working. 00124 _ThreadIndex __busy = 0; 00125 00126 _Job<_DifferenceType> *__job; 00127 00128 omp_lock_t __output_lock; 00129 omp_init_lock(&__output_lock); 00130 00131 // Write base value to output. 00132 __output = __base; 00133 00134 // No more threads than jobs, at least one thread. 00135 _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex> 00136 (1, __gnu_parallel::min<_DifferenceType>(__length, 00137 __get_max_threads())); 00138 00139 # pragma omp parallel shared(__busy) num_threads(__num_threads) 00140 { 00141 # pragma omp single 00142 { 00143 __num_threads = omp_get_num_threads(); 00144 00145 // Create job description array. 00146 __job = new _Job<_DifferenceType>[__num_threads * __stride]; 00147 } 00148 00149 // Initialization phase. 00150 00151 // Flags for every thread if it is doing productive work. 00152 bool __iam_working = false; 00153 00154 // Thread id. 00155 _ThreadIndex __iam = omp_get_thread_num(); 00156 00157 // This job. 00158 _Job<_DifferenceType>& __my_job = __job[__iam * __stride]; 00159 00160 // Random number (for work stealing). 00161 _ThreadIndex __victim; 00162 00163 // Local value for reduction. 00164 _Result __result = _Result(); 00165 00166 // Number of elements to steal in one attempt. 00167 _DifferenceType __steal; 00168 00169 // Every thread has its own random number generator 00170 // (modulo __num_threads). 00171 _RandomNumber __rand_gen(__iam, __num_threads); 00172 00173 // This thread is currently working. 00174 # pragma omp atomic 00175 ++__busy; 00176 00177 __iam_working = true; 00178 00179 // How many jobs per thread? last thread gets the rest. 00180 __my_job._M_first = static_cast<_DifferenceType> 00181 (__iam * (__length / __num_threads)); 00182 00183 __my_job._M_last = (__iam == (__num_threads - 1) 00184 ? (__length - 1) 00185 : ((__iam + 1) * (__length / __num_threads) - 1)); 00186 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 00187 00188 // Init result with _M_first value (to have a base value for reduction) 00189 if (__my_job._M_first <= __my_job._M_last) 00190 { 00191 // Cannot use volatile variable directly. 00192 _DifferenceType __my_first = __my_job._M_first; 00193 __result = __f(__op, __begin + __my_first); 00194 ++__my_job._M_first; 00195 --__my_job._M_load; 00196 } 00197 00198 _RAIter __current; 00199 00200 # pragma omp barrier 00201 00202 // Actual work phase 00203 // Work on own or stolen current start 00204 while (__busy > 0) 00205 { 00206 // Work until no productive thread left. 00207 # pragma omp flush(__busy) 00208 00209 // Thread has own work to do 00210 while (__my_job._M_first <= __my_job._M_last) 00211 { 00212 // fetch-and-add call 00213 // Reserve current job block (size __chunk_size) in my queue. 00214 _DifferenceType __current_job = 00215 __fetch_and_add<_DifferenceType>(&(__my_job._M_first), 00216 __chunk_size); 00217 00218 // Update _M_load, to make the three values consistent, 00219 // _M_first might have been changed in the meantime 00220 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 00221 for (_DifferenceType __job_counter = 0; 00222 __job_counter < __chunk_size 00223 && __current_job <= __my_job._M_last; 00224 ++__job_counter) 00225 { 00226 // Yes: process it! 00227 __current = __begin + __current_job; 00228 ++__current_job; 00229 00230 // Do actual work. 00231 __result = __r(__result, __f(__op, __current)); 00232 } 00233 00234 # pragma omp flush(__busy) 00235 } 00236 00237 // After reaching this point, a thread's __job list is empty. 00238 if (__iam_working) 00239 { 00240 // This thread no longer has work. 00241 # pragma omp atomic 00242 --__busy; 00243 00244 __iam_working = false; 00245 } 00246 00247 _DifferenceType __supposed_first, __supposed_last, 00248 __supposed_load; 00249 do 00250 { 00251 // Find random nonempty deque (not own), do consistency check. 00252 __yield(); 00253 # pragma omp flush(__busy) 00254 __victim = __rand_gen(); 00255 __supposed_first = __job[__victim * __stride]._M_first; 00256 __supposed_last = __job[__victim * __stride]._M_last; 00257 __supposed_load = __job[__victim * __stride]._M_load; 00258 } 00259 while (__busy > 0 00260 && ((__supposed_load <= 0) 00261 || ((__supposed_first + __supposed_load - 1) 00262 != __supposed_last))); 00263 00264 if (__busy == 0) 00265 break; 00266 00267 if (__supposed_load > 0) 00268 { 00269 // Has work and work to do. 00270 // Number of elements to steal (at least one). 00271 __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2; 00272 00273 // Push __victim's current start forward. 00274 _DifferenceType __stolen_first = 00275 __fetch_and_add<_DifferenceType> 00276 (&(__job[__victim * __stride]._M_first), __steal); 00277 _DifferenceType __stolen_try = (__stolen_first + __steal 00278 - _DifferenceType(1)); 00279 00280 __my_job._M_first = __stolen_first; 00281 __my_job._M_last = __gnu_parallel::min(__stolen_try, 00282 __supposed_last); 00283 __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; 00284 00285 // Has potential work again. 00286 # pragma omp atomic 00287 ++__busy; 00288 __iam_working = true; 00289 00290 # pragma omp flush(__busy) 00291 } 00292 # pragma omp flush(__busy) 00293 } // end while __busy > 0 00294 // Add accumulated result to output. 00295 omp_set_lock(&__output_lock); 00296 __output = __r(__output, __result); 00297 omp_unset_lock(&__output_lock); 00298 } 00299 00300 delete[] __job; 00301 00302 // Points to last element processed (needed as return value for 00303 // some algorithms like transform) 00304 __f._M_finish_iterator = __begin + __length; 00305 00306 omp_destroy_lock(&__output_lock); 00307 00308 return __op; 00309 } 00310 } // end namespace 00311 00312 #endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */