SW Task Event Loop Framework v1.0.0
High-performance C++ asynchronous event loop framework with timer management and promise-based programming
Loading...
Searching...
No Matches
EventQueue.cpp
Go to the documentation of this file.
1#include <chrono>
2#include "Message.h"
3#include "Promise.h"
4#include "EventQueue.h"
5#include <mutex>
6#include <thread>
7#include <iostream>
8using namespace std;
9#include <string>
10
11namespace swt {
12
14 mCurrentMessage(NULL),
15 mStarted(false), mQuit(false)
16{
17}
18
20 try {
21 // std::cout << "EventQueue destructor called" << std::endl;
22 // ...existing cleanup code...
23 // std::cout << "EventQueue destructor finished" << std::endl;
24 } catch (const std::exception& e) {
25 std::cerr << "Exception in EventQueue destructor: " << e.what() << std::endl;
26 } catch (...) {
27 std::cerr << "Unknown exception in EventQueue destructor" << std::endl;
28 }
29}
30
31bool EventQueue::enqueueMessage(const std::shared_ptr<Message>& message, int64_t whenUs)
32{
33 // cout<<"message queue: receive msg\n";
34
35 {
36 std::lock_guard<std::mutex> lock(iMutex);
37
38 // Insert at correct position instead of sort
39 auto insertPos = std::upper_bound(mQueue.begin(), mQueue.end(), whenUs,
40 [](int64_t time, const QueueItem& item) {
41 return time < item.whenUs;
42 });
43 mQueue.emplace(insertPos, message, whenUs);
44 }
45
46 // ALWAYS notify, regardless of mStarted
47 mQueueChanged.notify_one();
48 return true;
49}
50
51// Kiểm tra queue đã quit chưa
53 return mQuit;
54}
55
56std::optional<EventQueue::QueueItem> EventQueue::pollNext()
57{
58 // std::cout << "EventQueue: pollNext() called" << std::endl;
59
60 int poll_attempt = 0;
61 while (!mQuit) {
62 poll_attempt++;
63 mStarted = true;
64
65 // std::cout << "EventQueue: poll attempt " << poll_attempt << std::endl;
66
67 {
68 std::unique_lock<std::mutex> lk(iMutex);
69 // std::cout << "EventQueue: checking queue, size: " << mQueue.size() << std::endl;
70
71 // CHECK QUEUE FIRST before going to wait
72 if (!mQueue.empty()) {
73 auto currentTime = uptimeMicros();
74 // std::cout << "EventQueue: found item, time: " << mQueue.front().whenUs
75 // << ", current time: " << currentTime << std::endl;
76
77 if (mQueue.front().whenUs <= currentTime) {
78 auto item = std::move(mQueue.front());
79 mQueue.pop_front();
80 // std::cout << "EventQueue: returning immediate item, type: " << (int)item.type << std::endl;
81 return item;
82 }
83 }
84
85 if (mQueue.empty()) {
86 // std::cout << "EventQueue: queue empty, entering wait..." << std::endl;
87
88 // SHORTER timeout to avoid infinite wait
89 auto result = mQueueChanged.wait_for(lk, std::chrono::milliseconds(500), [this]{
90 // std::cout << "EventQueue: wait predicate - queue size: " << mQueue.size() << ", quit: " << mQuit << std::endl;
91 return (!mQueue.empty() || mQuit);
92 });
93
94 if (!result) {
95 // std::cout << "EventQueue: wait timed out after 500ms! Attempt: " << poll_attempt << std::endl;
96
97 // SAFETY: Return null after multiple timeouts
98 if (poll_attempt >= 20) { // 20 * 500ms = 10 seconds max
99 // std::cout << "EventQueue: too many timeouts, returning null" << std::endl;
100 return std::nullopt;
101 }
102 continue;
103 }
104
105 // std::cout << "EventQueue: woke up from wait" << std::endl;
106 }
107
108 if (mQuit) {
109 // std::cout << "EventQueue: quit requested" << std::endl;
110 break;
111 }
112
113 // RECHECK queue after wait
114 if (!mQueue.empty()) {
115 auto currentTime = uptimeMicros();
116 // std::cout << "EventQueue: after wait, front item time: " << mQueue.front().whenUs
117 // << ", current time: " << currentTime << std::endl;
118
119 if (mQueue.front().whenUs <= currentTime) {
120 auto item = std::move(mQueue.front());
121 mQueue.pop_front();
122 // std::cout << "EventQueue: returning post-wait item, type: " << (int)item.type << std::endl;
123 return item;
124 } else {
125 // Future item - wait for it
126 auto waitTime = mQueue.front().whenUs - currentTime;
127 // std::cout << "EventQueue: waiting " << waitTime << " microseconds for future item" << std::endl;
128
129 if (waitTime > 500000) waitTime = 500000; // Cap at 500ms
130
131 mQueueChanged.wait_for(lk, std::chrono::microseconds(waitTime));
132 // std::cout << "EventQueue: finished waiting for future item" << std::endl;
133 }
134 }
135 }
136 }
137
138 // std::cout << "EventQueue: returning nullopt" << std::endl;
139 return std::nullopt;
140}
141
142std::shared_ptr<Message> EventQueue::poll()
143{
144 // Legacy method - convert to new format
145 auto item = pollNext();
146 if (item && item->type == QueueItemType::MESSAGE) {
147 return item->message;
148 }
149 return nullptr;
150}
151
152bool EventQueue::hasMessage(const std::shared_ptr<Handler>& h, int32_t what, void* obj)
153{
154 if (h == NULL) {
155 return false;
156 }
157 std::lock_guard<std::mutex> lock(iMutex);
158
159 for (const auto& item : mQueue) {
160 if (item.type == QueueItemType::MESSAGE &&
161 item.message->mHandler == h &&
162 item.message->what == what &&
163 (obj == NULL || item.message->obj == obj)) {
164 return true;
165 }
166 }
167 return false;
168}
169
171{
172 {
173 std::lock_guard<std::mutex> lock(iMutex);
174 mQuit = true;
175 }
176 mQueueChanged.notify_all();
177}
178
180{
181 auto now = std::chrono::steady_clock::now();
182 auto dur = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch());
183 return dur.count();
184}
185
186
187
188// Explicit instantiations
189template swt::Promise<int> swt::EventQueue::enqueuePromise<int>();
190template swt::Promise<void> swt::EventQueue::enqueuePromise<void>();
191template swt::Promise<std::string> swt::EventQueue::enqueuePromise<std::string>();
192
193} // namespace swt
Unified event queue supporting messages and functions with timed execution.
Promise class for asynchronous result handling with continuation chaining and type safety.
EventQueue()
Constructor - initializes empty queue.
std::optional< QueueItem > pollNext()
Poll for next ready queue item (unified interface)
int64_t uptimeMicros()
Get current system uptime in microseconds.
~EventQueue()
Destructor - cleanup remaining items.
std::shared_ptr< Message > poll()
Poll for next ready message (legacy compatibility)
@ MESSAGE
Traditional message with handler.
void quit()
Request queue shutdown.
bool hasMessage(const std::shared_ptr< Handler > &h, int32_t what, void *obj)
Check for specific message in queue.
bool isQuit()
Check if quit message was received.
bool enqueueMessage(const std::shared_ptr< Message > &message, int64_t whenUs)
Enqueue traditional message for timed execution.
Type-safe promise for asynchronous result handling with continuation chaining.
Definition Promise.h:68
Software Timer namespace containing all timer-related classes.
Definition Awaitable.h:21
Unified queue item supporting both messages and function tasks.
Definition EventQueue.h:115