BlackCat_Tensors
A GPU-supported autograd and linear algebra library, designed for neural network construction
host_stream.h
Go to the documentation of this file.
1 
2 #ifndef BC_CONTEXT_HOSTSTREAM_H_
3 #define BC_CONTEXT_HOSTSTREAM_H_
4 
6 
7 #include <thread>
8 #include <queue>
9 #include <mutex>
10 #include <memory>
11 #include <atomic>
12 #include <condition_variable>
13 
14 namespace bc {
15 namespace streams {
16 
17 class HostEvent {
18 
19  struct contents {
20  std::atomic_bool recorded{false};
21  std::condition_variable cv;
22  std::mutex m_mutex;
23  };
24 
25  struct waiting_functor {
26  std::shared_ptr<contents> m_contents;
27  void operator () () const {
28  std::unique_lock<std::mutex> locker(m_contents.get()->m_mutex);
29 
30  if (!m_contents.get()->recorded.load())
31  m_contents.get()->cv.wait(locker, [&](){ return m_contents.get()->recorded.load(); });
32  }
33  };
34 
35  struct recording_functor {
36  std::shared_ptr<contents> m_contents;
37 
38  void operator () () const {
39  m_contents.get()->recorded.store(true);
40  m_contents.get()->cv.notify_all();
41  }
42  };
43 
44  std::shared_ptr<contents> m_contents = std::shared_ptr<contents>(new contents());
45 
46 public:
47 
48  recording_functor get_recorder() {
49  return {m_contents};
50  }
51  waiting_functor get_waiter() {
52  return {m_contents};
53  }
54 };
55 
56 class HostStream {
57 
58  struct Job {
59  virtual void operator () () const = 0;
60  virtual void run () const = 0;
61  virtual ~Job() {};
62  };
63 
64  template<class function>
65  struct JobInstance : Job {
66  function f;
67  JobInstance(function f) : f(f) {}
68  virtual void operator () () const override final { f(); }
69  virtual void run() const override final { f(); }
70  };
71 
72  mutable std::mutex m_queue_lock;
73  std::queue<std::unique_ptr<Job>> m_queue;
74 
75 private:
76 
77  void execute_queue() {
78  m_queue_lock.lock(); //lock while checking if empty
79  while (!m_queue.empty()){
80  std::unique_ptr<Job> curr_job = std::move(m_queue.front()); //move the current job
81  m_queue.pop(); //remove from queue
82  m_queue_lock.unlock(); //unlock while executing
83  curr_job.get()->run(); //this allows more jobs to be added while executing
84  m_queue_lock.lock(); //reacquire mutex (while we check if its empty)
85  }
86  m_queue_lock.unlock(); //unlock
87  }
88 
89 public:
90  template<class function>
91  void push(function functor) {
92  m_queue_lock.lock();
93  if (m_queue.empty()) {
94  m_queue_lock.unlock();
96  functor();
97  execute_queue();
98  )
99  } else {
100  m_queue.push(std::unique_ptr<Job>(new JobInstance<function>(functor)));
101  m_queue_lock.unlock();
102  }
103  }
104 
105  bool empty() const {
106  return m_queue.empty();
107  }
108 
109  bool active() const {
110  bool is_active;
111  m_queue_lock.lock();
112  is_active = m_queue.empty();
113  m_queue_lock.unlock();
114  return is_active;
115  }
116 }; //End of Queue object
117 
118 
119 }
120 }
121 
122 #endif
bool active() const
Definition: host_stream.h:109
waiting_functor get_waiter()
Definition: host_stream.h:51
bool empty() const
Definition: host_stream.h:105
Definition: host_stream.h:17
Definition: host_stream.h:56
void push(function functor)
Definition: host_stream.h:91
recording_functor get_recorder()
Definition: host_stream.h:48
#define BC_omp_async__(...)
Definition: common.h:266
The Evaluator determines if an expression needs to be greedily optimized.
Definition: algorithms.h:22