00001
00002
00003
00004
00005
00006
00007
00008 #ifndef EDG_WORKLOAD_COMMON_TASK_TASK_H
00009 #define EDG_WORKLOAD_COMMON_TASK_TASK_H
00010
00011 #include <deque>
00012 #include <cassert>
00013 #include <boost/function.hpp>
00014 #include <boost/thread/mutex.hpp>
00015 #include <boost/thread/condition.hpp>
00016 #include <boost/thread/thread.hpp>
00017
00018 namespace edg {
00019 namespace workload {
00020 namespace common {
00021 namespace task {
00022
00023
00024 struct Eof {};
00025 struct SigPipe {};
00026 struct Empty {};
00027 struct Full {};
00028
00029 template<typename T> class Pipe;
00030
00031 template<typename T>
00032 class PipeReadEnd {
00033
00034 friend class Pipe<T>;
00035
00036 Pipe<T>* m_parent;
00037
00038 public:
00039 PipeReadEnd(Pipe<T>* parent = 0): m_parent(parent) {}
00040 PipeReadEnd(const PipeReadEnd<T>& rhs) : m_parent(rhs.m_parent) {}
00041 PipeReadEnd<T>& operator=(const PipeReadEnd<T>& rhs)
00042 {
00043 m_parent = rhs.m_parent;
00044 return *this;
00045 }
00046 ~PipeReadEnd() {}
00047 T read() { return m_parent->read(); }
00048 T try_read() { return m_parent->try_read(); }
00049 void open() { m_parent->open_read_end(); }
00050 void close() { m_parent->close_read_end(); }
00051 };
00052
00053 template<typename T>
00054 class PipeWriteEnd {
00055
00056 friend class Pipe<T>;
00057
00058 Pipe<T>* m_parent;
00059
00060 public:
00061 PipeWriteEnd(Pipe<T>* parent = 0): m_parent(parent) {}
00062 PipeWriteEnd(const PipeWriteEnd& rhs): m_parent(rhs.m_parent) {}
00063 PipeWriteEnd& operator=(const PipeWriteEnd& rhs)
00064 {
00065 m_parent = rhs.m_parent;
00066 return *this;
00067 }
00068 ~PipeWriteEnd() {}
00069
00070 void write(T obj) { m_parent->write(obj); }
00071 void try_write(T obj) { m_parent->try_write(obj); }
00072 void open() { m_parent->open_write_end(); }
00073 void close() { m_parent->close_write_end(); }
00074 };
00075
00076
00077
00078
00079
00080 template<typename T>
00081 class Pipe {
00082
00083 boost::mutex m_mutex;
00084 std::deque<T> m_queue;
00085 size_t m_max_size;
00086 boost::condition m_not_full;
00087 boost::condition m_not_empty;
00088
00089 int m_num_writers;
00090 int m_num_readers;
00091
00092 bool m_write_end_is_closed;
00093 bool m_read_end_is_closed;
00094
00095
00096 Pipe(const Pipe& rhs);
00097 Pipe& operator=(const Pipe& rhs);
00098
00099 public:
00100 Pipe(size_t max_size = 10)
00101 : m_max_size(max_size), m_num_writers(0), m_num_readers(0),
00102 m_write_end_is_closed(false), m_read_end_is_closed(false)
00103 {
00104 }
00105 ~Pipe() {}
00106
00107 PipeReadEnd<T> read_end() { return PipeReadEnd<T>(this); }
00108 PipeWriteEnd<T> write_end() { return PipeWriteEnd<T>(this); }
00109
00110
00111 T read()
00112 {
00113 boost::mutex::scoped_lock lock(m_mutex);
00114
00115 while (m_queue.empty()) {
00116 if (m_write_end_is_closed) {
00117 throw Eof();
00118 }
00119 m_not_empty.wait(lock);
00120 }
00121 T result = m_queue.front();
00122 m_queue.pop_front();
00123 m_not_full.notify_one();
00124
00125 return result;
00126 }
00127
00128 T try_read()
00129 {
00130 boost::mutex::scoped_lock lock(m_mutex);
00131
00132 if (m_queue.empty()) {
00133 if (m_write_end_is_closed) {
00134 throw Eof();
00135 } else {
00136 throw Empty();
00137 }
00138 }
00139
00140 T result = m_queue.front();
00141 m_queue.pop_front();
00142 m_not_full.notify_one();
00143
00144 return result;
00145 }
00146
00147 void write(const T& obj)
00148 {
00149 boost::mutex::scoped_lock lock(m_mutex);
00150
00151 if (m_read_end_is_closed) {
00152 throw SigPipe();
00153 }
00154
00155 while (m_queue.size() == m_max_size) {
00156 m_not_full.wait(lock);
00157 if (m_read_end_is_closed) {
00158 throw SigPipe();
00159 }
00160 }
00161 m_queue.push_back(obj);
00162 m_not_empty.notify_one();
00163 }
00164
00165 void try_write(const T& obj)
00166 {
00167 boost::mutex::scoped_lock lock(m_mutex);
00168
00169 if (m_read_end_is_closed) {
00170 throw SigPipe();
00171 } else if (m_queue.size() == m_max_size) {
00172 throw Full();
00173 }
00174
00175 m_queue.push_back(obj);
00176 m_not_empty.notify_one();
00177 }
00178
00179 bool empty()
00180 {
00181 boost::mutex::scoped_lock lock(m_mutex);
00182
00183 return m_queue.empty();
00184 }
00185
00186 void open_read_end(int num = 1)
00187 {
00188 boost::mutex::scoped_lock lock(m_mutex);
00189
00190 m_num_readers += num;
00191 }
00192
00193 void open_write_end(int num = 1)
00194 {
00195 boost::mutex::scoped_lock lock(m_mutex);
00196
00197 m_num_writers += num;
00198 }
00199
00200 void close_read_end()
00201 {
00202 boost::mutex::scoped_lock lock(m_mutex);
00203
00204 --m_num_readers;
00205 if (m_num_readers == 0) {
00206
00207 m_read_end_is_closed = true;
00208
00209
00210 m_not_full.notify_all();
00211 }
00212 }
00213
00214 void close_write_end()
00215 {
00216 boost::mutex::scoped_lock lock(m_mutex);
00217
00218 --m_num_writers;
00219 if (m_num_writers == 0) {
00220
00221 m_write_end_is_closed = true;
00222
00223
00224 m_not_empty.notify_all();
00225 }
00226 }
00227
00228 };
00229
00230 template<typename T_IN>
00231 class PipeReader
00232 {
00233 PipeReadEnd<T_IN> m_read_end;
00234
00235 friend class Task;
00236 void read_from(const PipeReadEnd<T_IN>& from) { m_read_end = from; }
00237
00238 protected:
00239 PipeReader() {}
00240 PipeReadEnd<T_IN>& read_end() { return m_read_end; }
00241
00242 public:
00243 virtual ~PipeReader() {}
00244 virtual void run() = 0;
00245 };
00246
00247
00248 template<typename T_OUT>
00249 class PipeWriter
00250 {
00251 PipeWriteEnd<T_OUT> m_write_end;
00252
00253 friend class Task;
00254 void write_to(const PipeWriteEnd<T_OUT>& to) { m_write_end = to; }
00255
00256 protected:
00257 PipeWriter() {}
00258 PipeWriteEnd<T_OUT>& write_end() { return m_write_end; }
00259
00260 public:
00261 virtual ~PipeWriter() {}
00262 virtual void run() = 0;
00263 };
00264
00265 template<typename T_IN, typename T_OUT>
00266 class PipeForwarder
00267 {
00268 PipeReadEnd<T_IN> m_read_end;
00269 PipeWriteEnd<T_OUT> m_write_end;
00270
00271 friend class Task;
00272 void read_from(const PipeReadEnd<T_IN>& from) { m_read_end = from; }
00273 void write_to(const PipeWriteEnd<T_OUT>& to) { m_write_end = to; }
00274
00275 protected:
00276 PipeForwarder() {}
00277 PipeReadEnd<T_IN>& read_end() { return m_read_end; }
00278 PipeWriteEnd<T_OUT>& write_end() { return m_write_end; }
00279
00280 public:
00281 virtual ~PipeForwarder() {}
00282 virtual void run() = 0;
00283 };
00284
00285
00286 template<typename E>
00287 struct Access
00288 {
00289 E& m_e;
00290 Access(E& e): m_e(e) { m_e.open(); }
00291 ~Access() { m_e.close(); }
00292 };
00293
00294
00295 template<typename E>
00296 struct CloseOnExit
00297 {
00298 E& m_e;
00299 CloseOnExit(E& e): m_e(e) {}
00300 ~CloseOnExit() { m_e.close(); }
00301 };
00302
00303 template<typename T_IN>
00304 class ReaderFunctor
00305 {
00306 PipeReader<T_IN>& m_runner;
00307 PipeReadEnd<T_IN> m_read_end;
00308
00309 public:
00310 ReaderFunctor(PipeReader<T_IN>& runner, PipeReadEnd<T_IN> end)
00311 : m_runner(runner), m_read_end(end)
00312 {}
00313
00314 void operator()()
00315 {
00316 CloseOnExit< PipeReadEnd<T_IN> > a(m_read_end);
00317
00318 m_runner.run();
00319 }
00320 };
00321
00322 template<typename T_OUT>
00323 class WriterFunctor
00324 {
00325 PipeWriter<T_OUT>& m_runner;
00326 PipeWriteEnd<T_OUT> m_write_end;
00327
00328 public:
00329 WriterFunctor(PipeWriter<T_OUT>& runner, PipeWriteEnd<T_OUT> end)
00330 : m_runner(runner), m_write_end(end)
00331 {}
00332
00333 void operator()()
00334 {
00335 CloseOnExit< PipeWriteEnd<T_OUT> > a(m_write_end);
00336
00337 m_runner.run();
00338 }
00339 };
00340
00341 template<typename T_IN, typename T_OUT>
00342 class ForwarderFunctor
00343 {
00344 PipeForwarder<T_IN, T_OUT>& m_runner;
00345 PipeReadEnd<T_IN> m_read_end;
00346 PipeWriteEnd<T_OUT> m_write_end;
00347
00348 public:
00349 ForwarderFunctor(PipeForwarder<T_IN, T_OUT>& runner,
00350 PipeReadEnd<T_IN> read_end,
00351 PipeWriteEnd<T_OUT> write_end)
00352 : m_runner(runner), m_read_end(read_end), m_write_end(write_end)
00353 {}
00354
00355 void operator()()
00356 {
00357 CloseOnExit< PipeReadEnd<T_IN> > r(m_read_end);
00358 CloseOnExit< PipeWriteEnd<T_OUT> > w(m_write_end);
00359
00360 m_runner.run();
00361 }
00362 };
00363
00364 class Task
00365 {
00366 boost::thread_group* m_group;
00367
00368
00369 Task(const Task& rhs);
00370 Task& operator=(const Task& rhs);
00371
00372 public:
00373
00374 Task(const boost::function0<void>& fun, int num_threads = 1)
00375 : m_group(new boost::thread_group)
00376 {
00377 assert(num_threads > 0);
00378
00379 while (num_threads--) {
00380 m_group->create_thread(fun);
00381 }
00382 }
00383
00384 template<typename T>
00385 Task(PipeReader<T>& runner, Pipe<T>& pipe, int num_threads = 1)
00386 : m_group(new boost::thread_group)
00387 {
00388 assert(num_threads > 0);
00389
00390 runner.read_from(pipe.read_end());
00391 ReaderFunctor<T> f(runner, pipe.read_end());
00392 pipe.open_read_end(num_threads);
00393 while (num_threads--) {
00394 m_group->create_thread(f);
00395 }
00396 }
00397
00398 template<typename T>
00399 Task(PipeWriter<T>& runner, Pipe<T>& pipe, int num_threads = 1)
00400 : m_group(new boost::thread_group)
00401 {
00402 assert(num_threads > 0);
00403
00404 runner.write_to(pipe.write_end());
00405 WriterFunctor<T> f(runner, pipe.write_end());
00406 pipe.open_write_end(num_threads);
00407 while (num_threads--) {
00408 m_group->create_thread(f);
00409 }
00410 }
00411
00412 template<typename T_IN, typename T_OUT>
00413 Task(PipeForwarder<T_IN, T_OUT>& runner,
00414 Pipe<T_IN>& pipe_in, Pipe<T_OUT>& pipe_out,
00415 int num_threads = 1)
00416 : m_group(new boost::thread_group)
00417 {
00418 assert(num_threads > 0);
00419
00420 runner.read_from(pipe_in.read_end());
00421 pipe_in.open_read_end(num_threads);
00422 runner.write_to(pipe_out.write_end());
00423 pipe_out.open_write_end(num_threads);
00424 ForwarderFunctor<T_IN, T_OUT> f(runner, pipe_in.read_end(), pipe_out.write_end());
00425 while (num_threads--) {
00426 m_group->create_thread(f);
00427 }
00428 }
00429
00430 ~Task()
00431 {
00432 m_group->join_all();
00433 delete m_group;
00434 }
00435
00436 };
00437
00438 }
00439 }
00440 }
00441 }
00442
00443 #endif // EDG_WORKLOAD_COMMON_TASK_TASK_H