SW Task Event Loop Framework v1.0.0
High-performance C++ asynchronous event loop framework with timer management and promise-based programming
Loading...
Searching...
No Matches
TimerManager.cpp
Go to the documentation of this file.
1#include "TimerManager.h"
2#include "SLLooper.h"
3#include "Debug.h"
4#include <iostream>
5#include <chrono>
6#include <errno.h>
7#include <cstring>
8
9namespace swt {
10
11#ifndef TIMER_USE_TIMERFD_EPOLL
12// Static members for sigev_thread backend
13std::unordered_map<TimerId, TimerManager*> TimerManager::sTimerManagerMap;
14std::mutex TimerManager::sManagerMapMutex;
15#endif
16
17TimerManager::TimerManager(std::weak_ptr<SLLooper> looper)
18 : mLooper(looper) {
19
20 TIMER_DEBUG_STREAM("Initializing TimerManager with backend: " << getBackendName());
21
22#ifdef TIMER_USE_TIMERFD_EPOLL
23 // Initialize timerfd+epoll backend
24 mEpollFd = epoll_create1(EPOLL_CLOEXEC);
25 if (mEpollFd == -1) {
26 TIMER_ERROR_STREAM("Failed to create epoll fd: " << strerror(errno));
27 throw std::runtime_error("Failed to create epoll fd");
28 }
29
30 TIMER_DEBUG_STREAM("Created epoll fd: " << mEpollFd);
31
32 // Start timer thread
33 mTimerThread = std::thread(&TimerManager::timerThreadFunc, this);
34 TIMER_DEBUG("Timer thread started");
35#else
36 // Sigev_thread backend doesn't need special initialization
37 TIMER_DEBUG("Sigev_thread backend initialized");
38#endif
39}
40
42 TIMER_INFO("Destructor called");
43 mRunning = false;
44
45 // Cleanup all timers
46 {
47 std::lock_guard<std::mutex> lock(mTimersMutex);
48 TIMER_DEBUG_STREAM("Cleaning up " << mTimers.size() << " timers");
49 for (auto& [id, timerInfo] : mTimers) {
50#ifdef TIMER_USE_TIMERFD_EPOLL
51 close(timerInfo.fd);
52#else
53 timer_delete(timerInfo.timer);
54 // Remove from global map
55 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
56 sTimerManagerMap.erase(id);
57#endif
58 }
59 mTimers.clear();
60 }
61
62#ifdef TIMER_USE_TIMERFD_EPOLL
63 if (mTimerThread.joinable()) {
64 mTimerThread.join();
65 TIMER_DEBUG("Timer thread joined");
66 }
67
68 if (mEpollFd != -1) {
69 close(mEpollFd);
70 TIMER_INFO("Epoll fd closed");
71 }
72#else
73 TIMER_DEBUG("Sigev_thread cleanup completed");
74#endif
75}
76
77TimerId TimerManager::createTimer(std::function<void()> callback, uint64_t delay_ms,
78 bool periodic, std::atomic<bool>* cancelled) {
79 if (!callback) {
80 TIMER_ERROR("callback is null!");
81 return 0;
82 }
83
84 TimerId id = mNextId++;
85 TIMER_DEBUG_STREAM("Creating " << getBackendName() << " timer " << id
86 << " with delay " << delay_ms << "ms, periodic: " << periodic);
87
88#ifdef TIMER_USE_TIMERFD_EPOLL
89 // Timerfd+epoll implementation
90 int timer_fd = createTimerFd(delay_ms, periodic);
91 if (timer_fd == -1) {
92 TIMER_ERROR_STREAM("Failed to create timerfd for timer " << id
93 << ": " << strerror(errno));
94 return 0;
95 }
96
97 TIMER_DEBUG_STREAM("Created timerfd " << timer_fd << " for timer " << id);
98
99 // Add to epoll
100 struct epoll_event ev;
101 ev.events = EPOLLIN;
102 ev.data.u64 = id;
103
104 if (epoll_ctl(mEpollFd, EPOLL_CTL_ADD, timer_fd, &ev) == -1) {
105 TIMER_ERROR_STREAM("Failed to add timer " << id << " to epoll: "
106 << strerror(errno));
107 close(timer_fd);
108 return 0;
109 }
110
111 TIMER_DEBUG_STREAM("Added timer " << id << " to epoll successfully");
112
113 // Store timer info
114 {
115 std::lock_guard<std::mutex> lock(mTimersMutex);
116 TimerInfo info;
117 info.fd = timer_fd;
118 info.callback = std::move(callback);
119 info.periodic = periodic;
120 info.interval_ms = delay_ms;
121 info.id = id;
122 info.cancelled = cancelled;
123
124 mTimers[id] = std::move(info);
125 TIMER_DEBUG_STREAM("Stored timer " << id << " info. Total timers: "
126 << mTimers.size());
127 }
128
129#else
130 // Sigev_thread implementation
131 // Store timer info first (needed for callback)
132 {
133 std::lock_guard<std::mutex> lock(mTimersMutex);
134 TimerInfo info;
135 info.timer = (timer_t)0; // Will be set later
136 info.timerId = id;
137 info.callback = std::move(callback);
138 info.periodic = periodic;
139 info.interval_ms = delay_ms;
140 info.id = id;
141 info.cancelled = cancelled;
142
143 mTimers[id] = std::move(info);
144
145 // Add to global map for callback lookup
146 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
147 sTimerManagerMap[id] = this;
148 }
149
150 // Create timer
151 timer_t timer = createSigevTimer(id);
152 if (timer == (timer_t)-1) {
153 TIMER_ERROR_STREAM("Failed to create sigev timer for timer " << id);
154 std::lock_guard<std::mutex> lock(mTimersMutex);
155 mTimers.erase(id);
156 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
157 sTimerManagerMap.erase(id);
158 return 0;
159 }
160
161 // Update timer_t in TimerInfo
162 {
163 std::lock_guard<std::mutex> lock(mTimersMutex);
164 mTimers[id].timer = timer;
165 }
166
167 // Start timer
168 updateSigevTimer(timer, delay_ms, periodic);
169
170 TIMER_DEBUG_STREAM("Created and started sigev timer " << id);
171#endif
172
173 return id;
174}
175
176#ifdef TIMER_USE_TIMERFD_EPOLL
177
178void TimerManager::timerThreadFunc() {
179 TIMER_DEBUG("Timer thread starting...");
180 const int MAX_EVENTS = 64;
181 struct epoll_event events[MAX_EVENTS];
182
183 int loop_count = 0;
184 while (mRunning) {
185 loop_count++;
186
187 int nfds = epoll_wait(mEpollFd, events, MAX_EVENTS, 100); // 100ms timeout
188
189 if (nfds == -1) {
190 if (errno == EINTR) {
191 TIMER_DEBUG("epoll_wait interrupted, continuing...");
192 continue;
193 }
194 TIMER_ERROR_STREAM("epoll_wait error: " << strerror(errno));
195 break;
196 }
197
198 if (nfds > 0) {
199 TIMER_DEBUG_STREAM("Got " << nfds << " timer events (loop " << loop_count << ")");
200 }
201
202 for (int i = 0; i < nfds; i++) {
203 TimerId id = events[i].data.u64;
204 TIMER_DEBUG_STREAM("Processing timer event for ID " << id);
205
206 bool shouldProcess = false;
207 TimerInfo timerInfo;
208
209 {
210 std::lock_guard<std::mutex> lock(mTimersMutex);
211 auto it = mTimers.find(id);
212 if (it != mTimers.end()) {
213 // Check cancelled first
214 if (it->second.cancelled && it->second.cancelled->load()) {
215 TIMER_DEBUG_STREAM("Timer " << id << " is cancelled, cleaning up");
216
217 // Read to clear event
218 uint64_t exp;
219 ssize_t bytes = read(it->second.fd, &exp, sizeof(exp));
220 TIMER_DEBUG_STREAM("Read " << bytes << " bytes from cancelled timer " << id);
221
222 // Cleanup immediately
223 epoll_ctl(mEpollFd, EPOLL_CTL_DEL, it->second.fd, nullptr);
224 close(it->second.fd);
225 mTimers.erase(it);
226 continue;
227 }
228
229 timerInfo = it->second;
230 shouldProcess = true;
231 }
232 }
233
234 if (shouldProcess) {
235 // Read event
236 uint64_t exp;
237 ssize_t bytes = read(timerInfo.fd, &exp, sizeof(exp));
238 TIMER_DEBUG_STREAM("Read " << bytes << " bytes from timer " << id
239 << ", expirations: " << exp);
240
241 // Final check before callback
242 if (timerInfo.cancelled && timerInfo.cancelled->load()) {
243 TIMER_DEBUG_STREAM("Timer " << id << " cancelled before callback");
244 cleanupTimer(id);
245 continue;
246 }
247
248 // Handle timer expired
249 handleTimerExpired(timerInfo);
250
251 // Cleanup non-periodic timers
252 if (!timerInfo.periodic) {
253 TIMER_DEBUG_STREAM("Cleaning up one-shot timer " << id);
254 cleanupTimer(id);
255 }
256 }
257 }
258 }
259
260 TIMER_DEBUG_STREAM("Timer thread exiting after " << loop_count << " loops");
261}
262
263int TimerManager::createTimerFd(uint64_t delay_ms, bool periodic) {
264 TIMER_DEBUG_STREAM("Creating timerfd with delay " << delay_ms
265 << "ms, periodic: " << periodic);
266
267 int timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
268 if (timer_fd == -1) {
269 TIMER_ERROR_STREAM("timerfd_create failed: " << strerror(errno));
270 return -1;
271 }
272
273 struct itimerspec its;
274 its.it_value.tv_sec = delay_ms / 1000;
275 its.it_value.tv_nsec = (delay_ms % 1000) * 1000000;
276
277 if (periodic) {
278 its.it_interval = its.it_value;
279 TIMER_DEBUG_STREAM("Setting periodic timer: " << its.it_value.tv_sec
280 << "s " << its.it_value.tv_nsec << "ns");
281 } else {
282 its.it_interval.tv_sec = 0;
283 its.it_interval.tv_nsec = 0;
284 TIMER_DEBUG_STREAM("Setting one-shot timer: " << its.it_value.tv_sec
285 << "s " << its.it_value.tv_nsec << "ns");
286 }
287
288 if (timerfd_settime(timer_fd, 0, &its, nullptr) == -1) {
289 TIMER_ERROR_STREAM("timerfd_settime failed: " << strerror(errno));
290 close(timer_fd);
291 return -1;
292 }
293
294 return timer_fd;
295}
296
297void TimerManager::updateTimerFd(int fd, uint64_t delay_ms, bool periodic) {
298 TIMER_DEBUG_STREAM("Updating timerfd " << fd << " with delay " << delay_ms
299 << "ms, periodic: " << periodic);
300
301 struct itimerspec its;
302 its.it_value.tv_sec = delay_ms / 1000;
303 its.it_value.tv_nsec = (delay_ms % 1000) * 1000000;
304
305 if (periodic) {
306 its.it_interval = its.it_value;
307 } else {
308 its.it_interval.tv_sec = 0;
309 its.it_interval.tv_nsec = 0;
310 }
311
312 if (timerfd_settime(fd, 0, &its, nullptr) == -1) {
313 TIMER_ERROR_STREAM("Failed to update timerfd " << fd << ": "
314 << strerror(errno));
315 }
316}
317
318#else
319
320timer_t TimerManager::createSigevTimer(TimerId id) {
321 timer_t timer;
322
323 struct sigevent sev;
324 sev.sigev_notify = SIGEV_THREAD;
325 sev.sigev_notify_function = &TimerManager::sigevTimerCallback;
326 sev.sigev_notify_attributes = nullptr;
327 sev.sigev_value.sival_ptr = reinterpret_cast<void*>(id); // Pass timer ID
328
329 if (timer_create(CLOCK_MONOTONIC, &sev, &timer) == -1) {
330 TIMER_ERROR_STREAM("timer_create failed: " << strerror(errno));
331 return (timer_t)-1;
332 }
333
334 return timer;
335}
336
337void TimerManager::updateSigevTimer(timer_t timer, uint64_t delay_ms, bool periodic) {
338 struct itimerspec its;
339 its.it_value.tv_sec = delay_ms / 1000;
340 its.it_value.tv_nsec = (delay_ms % 1000) * 1000000;
341
342 if (periodic) {
343 its.it_interval = its.it_value;
344 TIMER_DEBUG_STREAM("Setting periodic sigev timer: " << its.it_value.tv_sec
345 << "s " << its.it_value.tv_nsec << "ns");
346 } else {
347 its.it_interval.tv_sec = 0;
348 its.it_interval.tv_nsec = 0;
349 TIMER_DEBUG_STREAM("Setting one-shot sigev timer: " << its.it_value.tv_sec
350 << "s " << its.it_value.tv_nsec << "ns");
351 }
352
353 if (timer_settime(timer, 0, &its, nullptr) == -1) {
354 TIMER_ERROR_STREAM("timer_settime failed: " << strerror(errno));
355 }
356}
357
358void TimerManager::sigevTimerCallback(sigval_t sv) {
359 TimerId id = reinterpret_cast<TimerId>(sv.sival_ptr);
360
361 TIMER_DEBUG_STREAM("Sigev timer callback for timer " << id);
362
363 // Find TimerManager for this timer
364 TimerManager* manager = nullptr;
365 {
366 std::lock_guard<std::mutex> lock(sManagerMapMutex);
367 auto it = sTimerManagerMap.find(id);
368 if (it != sTimerManagerMap.end()) {
369 manager = it->second;
370 }
371 }
372
373 if (!manager) {
374 TIMER_DEBUG_STREAM("Timer manager not found for timer " << id);
375 return;
376 }
377
378 TimerInfo timerInfo;
379 bool shouldProcess = false;
380
381 {
382 std::lock_guard<std::mutex> lock(manager->mTimersMutex);
383 auto it = manager->mTimers.find(id);
384 if (it != manager->mTimers.end()) {
385 // Check cancelled
386 if (it->second.cancelled && it->second.cancelled->load()) {
387 TIMER_DEBUG_STREAM("Sigev timer " << id << " is cancelled, cleaning up");
388 timer_delete(it->second.timer);
389 manager->mTimers.erase(it);
390 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
391 sTimerManagerMap.erase(id);
392 return;
393 }
394
395 timerInfo = it->second;
396 shouldProcess = true;
397
398 // Cleanup non-periodic timers
399 if (!timerInfo.periodic) {
400 TIMER_DEBUG_STREAM("Cleaning up one-shot sigev timer " << id);
401 timer_delete(it->second.timer);
402 manager->mTimers.erase(it);
403 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
404 sTimerManagerMap.erase(id);
405 }
406 }
407 }
408
409 if (shouldProcess) {
410 manager->handleTimerExpired(timerInfo);
411 }
412}
413
414#endif
415
417 TIMER_DEBUG_STREAM("Cancelling " << getBackendName() << " timer " << id);
418 std::lock_guard<std::mutex> lock(mTimersMutex);
419
420 auto it = mTimers.find(id);
421 if (it == mTimers.end()) {
422 TIMER_DEBUG_STREAM("Timer " << id << " not found for cancellation");
423 return false;
424 }
425
426#ifdef TIMER_USE_TIMERFD_EPOLL
427 epoll_ctl(mEpollFd, EPOLL_CTL_DEL, it->second.fd, nullptr);
428 close(it->second.fd);
429#else
430 timer_delete(it->second.timer);
431 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
432 sTimerManagerMap.erase(id);
433#endif
434
435 mTimers.erase(it);
436
437 TIMER_DEBUG_STREAM("Timer " << id << " cancelled successfully. Remaining: "
438 << mTimers.size());
439 return true;
440}
441
443 std::lock_guard<std::mutex> lock(mTimersMutex);
444 bool exists = mTimers.find(id) != mTimers.end();
445 TIMER_DEBUG_STREAM("Timer " << id << " exists: " << exists);
446 return exists;
447}
448
449bool TimerManager::restartTimer(TimerId id, uint64_t delay_ms) {
450 TIMER_DEBUG_STREAM("Restarting " << getBackendName() << " timer " << id
451 << " with delay " << delay_ms << "ms");
452 std::lock_guard<std::mutex> lock(mTimersMutex);
453
454 auto it = mTimers.find(id);
455 if (it == mTimers.end()) {
456 TIMER_DEBUG_STREAM("Timer " << id << " not found for restart");
457 return false;
458 }
459
460 // Reset cancelled flag
461 if (it->second.cancelled) {
462 it->second.cancelled->store(false);
463 }
464
465#ifdef TIMER_USE_TIMERFD_EPOLL
466 updateTimerFd(it->second.fd, delay_ms, false);
467#else
468 updateSigevTimer(it->second.timer, delay_ms, false);
469#endif
470
471 it->second.interval_ms = delay_ms;
472 it->second.periodic = false;
473
474 TIMER_DEBUG_STREAM("Timer " << id << " restarted successfully");
475 return true;
476}
477
479 std::lock_guard<std::mutex> lock(mTimersMutex);
480 return mTimers.size();
481}
482
483void TimerManager::updateCancelledPtr(TimerId id, std::atomic<bool>* newPtr) {
484 TIMER_DEBUG_STREAM("Updating cancelled pointer for timer " << id);
485 std::lock_guard<std::mutex> lock(mTimersMutex);
486
487 auto it = mTimers.find(id);
488 if (it != mTimers.end()) {
489 it->second.cancelled = newPtr;
490 TIMER_DEBUG_STREAM("Updated cancelled pointer for timer " << id << " successfully");
491 } else {
492 TIMER_DEBUG_STREAM("Timer " << id << " not found for pointer update");
493 }
494}
495
496void TimerManager::handleTimerExpired(const TimerInfo& timerInfo) {
497 TIMER_DEBUG_STREAM("Handling expired timer " << timerInfo.id);
498
499 // Double-check cancelled before post callback
500 if (timerInfo.cancelled && timerInfo.cancelled->load()) {
501 TIMER_DEBUG_STREAM("Timer " << timerInfo.id << " cancelled, skipping callback");
502 return;
503 }
504
505 // Post callback to SLLooper main thread
506 if (auto looper = mLooper.lock()) {
507 TIMER_DEBUG_STREAM("Posting callback for timer " << timerInfo.id << " to main thread");
508
509 auto result = looper->post([callback = timerInfo.callback, cancelled = timerInfo.cancelled, id = timerInfo.id]() {
510 TIMER_DEBUG_STREAM("Executing callback for timer " << id << " in main thread");
511
512 // Triple-check cancelled in main thread
513 if (cancelled && cancelled->load()) {
514 TIMER_DEBUG_STREAM("Timer " << id << " cancelled in main thread, skipping");
515 return;
516 }
517
518 try {
519 callback();
520 TIMER_DEBUG_STREAM("Callback for timer " << id << " executed successfully");
521 } catch (const std::exception& e) {
522 TIMER_ERROR_STREAM("Timer " << id << " callback exception: " << e.what());
523 }
524 });
525
526 TIMER_DEBUG_STREAM("Posted callback for timer " << timerInfo.id
527 << " (future valid: " << result.valid() << ")");
528 } else {
529 TIMER_ERROR_STREAM("Failed to lock looper for timer " << timerInfo.id);
530 }
531}
532
533void TimerManager::cleanupTimer(TimerId id) {
534 TIMER_DEBUG_STREAM("Cleaning up timer " << id);
535 std::lock_guard<std::mutex> lock(mTimersMutex);
536 auto it = mTimers.find(id);
537 if (it != mTimers.end()) {
538#ifdef TIMER_USE_TIMERFD_EPOLL
539 epoll_ctl(mEpollFd, EPOLL_CTL_DEL, it->second.fd, nullptr);
540 close(it->second.fd);
541#else
542 timer_delete(it->second.timer);
543 std::lock_guard<std::mutex> mapLock(sManagerMapMutex);
544 sTimerManagerMap.erase(id);
545#endif
546 mTimers.erase(it);
547 TIMER_DEBUG_STREAM("Timer " << id << " cleaned up. Remaining: "
548 << mTimers.size());
549 }
550}
551
552} // namespace swt
Centralized debug and logging macros for SW Task Framework components.
#define TIMER_ERROR_STREAM(stream_expr)
Always-on error logging for TimerManager with stream expression.
Definition Debug.h:137
#define TIMER_DEBUG(msg)
Conditional debug logging for TimerManager with simple message.
Definition Debug.h:77
#define TIMER_INFO(msg)
Always-on info logging for TimerManager lifecycle events.
Definition Debug.h:156
#define TIMER_ERROR(msg)
Always-on error logging for TimerManager critical issues.
Definition Debug.h:122
#define TIMER_DEBUG_STREAM(stream_expr)
Conditional debug logging for TimerManager with stream expression.
Definition Debug.h:99
Main event loop coordinator for asynchronous task management and timer operations.
High-performance timer management using Linux timerfd+epoll or sigev_thread.
TimerManager(std::weak_ptr< SLLooper > looper)
Construct a new TimerManager.
TimerId createTimer(std::function< void()> callback, uint64_t delay_ms, bool periodic, std::atomic< bool > *cancelled)
Create a new timer.
size_t getActiveTimerCount()
Get the number of currently active timers.
void updateCancelledPtr(TimerId id, std::atomic< bool > *newPtr)
Update the cancellation flag pointer for an existing timer.
~TimerManager()
Destroy the TimerManager and cleanup all resources.
static const char * getBackendName()
Get the name of the currently compiled timer backend.
bool hasTimer(TimerId id)
Check if a timer exists and is active.
bool restartTimer(TimerId id, uint64_t delay_ms)
Restart an existing timer with new delay.
bool cancelTimer(TimerId id)
Cancel an active timer.
Software Timer namespace containing all timer-related classes.
Definition Awaitable.h:21
uint64_t TimerId
Unique identifier type for timer instances.
Definition Timer.h:25
Internal timer information structure.
std::function< void()> callback
User callback function to execute on timer expiry.
TimerId id
Unique timer identifier.
int fd
timerfd file descriptor for Linux backend
uint64_t interval_ms
Timer interval in milliseconds.
std::atomic< bool > * cancelled
Pointer to cancellation flag (optional)
bool periodic
true for repeating timer, false for one-shot