PACS System 0.1.0
PACS DICOM system library
Loading...
Searching...
No Matches
accept_worker.cpp
Go to the documentation of this file.
1// BSD 3-Clause License
2// Copyright (c) 2021-2025, 🍀☀🌕🌥 🌊
3// See the LICENSE file in the project root for full license information.
4
11
12#include <cerrno>
13#include <cstring>
14#include <sstream>
15
16#ifdef _WIN32
17// Windows-specific includes already in header
18#else
19#include <fcntl.h>
20#endif
21
23
24// =============================================================================
25// Construction / Destruction
26// =============================================================================
27
29 uint16_t port,
30 connection_callback on_connection,
31 maintenance_callback on_maintenance)
32 : thread_base("accept_worker")
33 , port_(port)
34 , on_connection_(std::move(on_connection))
35 , on_maintenance_(std::move(on_maintenance)) {
36}
37
39 // Ensure graceful shutdown if still running
40 if (is_running()) {
41 stop();
42 }
43}
44
45// =============================================================================
46// Configuration
47// =============================================================================
48
50 backlog_ = backlog;
51}
52
53uint16_t accept_worker::port() const noexcept {
54 return port_;
55}
56
58 return backlog_;
59}
60
61// =============================================================================
62// Status
63// =============================================================================
64
65bool accept_worker::is_accepting() const noexcept {
66 return accepting_.load(std::memory_order_acquire);
67}
68
69std::string accept_worker::to_string() const {
70 std::ostringstream oss;
71 oss << "accept_worker{"
72 << "port=" << port_
73 << ", backlog=" << backlog_
74 << ", accepting=" << (is_accepting() ? "true" : "false")
75 << ", running=" << (is_running() ? "true" : "false")
76 << ", sessions=" << session_id_counter_.load(std::memory_order_relaxed)
77 << "}";
78 return oss.str();
79}
80
81// =============================================================================
82// thread_base Overrides
83// =============================================================================
84
86 // Initialize TCP socket for accepting connections
87
88#ifdef _WIN32
89 // Initialize Winsock
90 WSADATA wsa_data;
91 int wsa_result = WSAStartup(MAKEWORD(2, 2), &wsa_data);
92 if (wsa_result != 0) {
93 return kcenon::common::error_info("WSAStartup failed: " + std::to_string(wsa_result));
94 }
95 wsa_initialized_ = true;
96
97 // Create socket
98 listen_socket_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
99 if (listen_socket_ == INVALID_SOCKET) {
100 int err = WSAGetLastError();
101 WSACleanup();
102 wsa_initialized_ = false;
103 return kcenon::common::error_info("Failed to create socket: " + std::to_string(err));
104 }
105
106 // Set socket options
107 int opt = 1;
108 setsockopt(listen_socket_, SOL_SOCKET, SO_REUSEADDR,
109 reinterpret_cast<const char*>(&opt), sizeof(opt));
110
111 // Set non-blocking mode
112 u_long mode = 1;
113 ioctlsocket(listen_socket_, FIONBIO, &mode);
114
115 // Bind
116 sockaddr_in addr{};
117 addr.sin_family = AF_INET;
118 addr.sin_addr.s_addr = INADDR_ANY;
119 addr.sin_port = htons(port_);
120
121 if (bind(listen_socket_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
122 int err = WSAGetLastError();
123 closesocket(listen_socket_);
124 listen_socket_ = INVALID_SOCKET;
125 WSACleanup();
126 wsa_initialized_ = false;
127 return kcenon::common::error_info("Failed to bind to port " +
128 std::to_string(port_) + ": " + std::to_string(err));
129 }
130
131 // Listen
132 if (listen(listen_socket_, backlog_) == SOCKET_ERROR) {
133 int err = WSAGetLastError();
134 closesocket(listen_socket_);
135 listen_socket_ = INVALID_SOCKET;
136 WSACleanup();
137 wsa_initialized_ = false;
138 return kcenon::common::error_info("Failed to listen: " + std::to_string(err));
139 }
140#else
141 // Create socket
142 listen_socket_ = socket(AF_INET, SOCK_STREAM, 0);
143 if (listen_socket_ < 0) {
144 return kcenon::common::error_info("Failed to create socket: " +
145 std::string(std::strerror(errno)));
146 }
147
148 // Set socket options
149 int opt = 1;
150 setsockopt(listen_socket_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
151
152 // Set non-blocking mode
153 int flags = fcntl(listen_socket_, F_GETFL, 0);
154 if (flags >= 0) {
155 fcntl(listen_socket_, F_SETFL, flags | O_NONBLOCK);
156 }
157
158 // Bind
159 sockaddr_in addr{};
160 addr.sin_family = AF_INET;
161 addr.sin_addr.s_addr = INADDR_ANY;
162 addr.sin_port = htons(port_);
163
164 if (bind(listen_socket_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
165 std::string err_msg = std::strerror(errno);
166 close(listen_socket_);
167 listen_socket_ = -1;
168 return kcenon::common::error_info("Failed to bind to port " +
169 std::to_string(port_) + ": " + err_msg);
170 }
171
172 // Listen
173 if (listen(listen_socket_, backlog_) < 0) {
174 std::string err_msg = std::strerror(errno);
175 close(listen_socket_);
176 listen_socket_ = -1;
177 return kcenon::common::error_info("Failed to listen: " + err_msg);
178 }
179#endif
180
181 accepting_.store(true, std::memory_order_release);
182 return kcenon::common::ok();
183}
184
186 // Main work routine for the accept loop
187
188#ifdef _WIN32
189 if (listen_socket_ == INVALID_SOCKET) {
190 return kcenon::common::ok();
191 }
192
193 // Use select with short timeout to check for incoming connections
194 fd_set readfds;
195 FD_ZERO(&readfds);
196 FD_SET(listen_socket_, &readfds);
197
198 timeval tv{};
199 tv.tv_sec = 0;
200 tv.tv_usec = 10000; // 10ms timeout for responsive maintenance callbacks
201
202 int result = select(0, &readfds, nullptr, nullptr, &tv);
203
204 if (result > 0 && FD_ISSET(listen_socket_, &readfds)) {
205 sockaddr_in client_addr{};
206 int addr_len = sizeof(client_addr);
207
208 SOCKET client_socket = accept(listen_socket_,
209 reinterpret_cast<sockaddr*>(&client_addr),
210 &addr_len);
211
212 if (client_socket != INVALID_SOCKET) {
213 // Generate session ID and invoke callback
214 uint64_t session_id = next_session_id();
215
216 if (on_connection_) {
217 on_connection_(session_id);
218 }
219
220 // Note: For now, we accept and immediately close the connection
221 // since association doesn't support real network I/O yet.
222 // The connection callback can be extended to handle the socket
223 // when network_system integration is complete.
224 closesocket(client_socket);
225 }
226 }
227#else
228 if (listen_socket_ < 0) {
229 return kcenon::common::ok();
230 }
231
232 // Use select with short timeout to check for incoming connections
233 fd_set readfds;
234 FD_ZERO(&readfds);
235 FD_SET(listen_socket_, &readfds);
236
237 timeval tv{};
238 tv.tv_sec = 0;
239 tv.tv_usec = 10000; // 10ms timeout for responsive maintenance callbacks
240
241 int result = select(listen_socket_ + 1, &readfds, nullptr, nullptr, &tv);
242
243 if (result > 0 && FD_ISSET(listen_socket_, &readfds)) {
244 sockaddr_in client_addr{};
245 socklen_t addr_len = sizeof(client_addr);
246
247 int client_socket = accept(listen_socket_,
248 reinterpret_cast<sockaddr*>(&client_addr),
249 &addr_len);
250
251 if (client_socket >= 0) {
252 // Generate session ID and invoke callback
253 uint64_t session_id = next_session_id();
254
255 if (on_connection_) {
256 on_connection_(session_id);
257 }
258
259 // Note: For now, we accept and immediately close the connection
260 // since association doesn't support real network I/O yet.
261 // The connection callback can be extended to handle the socket
262 // when network_system integration is complete.
263 close(client_socket);
264 }
265 }
266#endif
267
268 // Invoke maintenance callback for periodic tasks (e.g., idle timeout checks)
269 if (on_maintenance_) {
271 }
272
273 return kcenon::common::ok();
274}
275
277 // Clean up resources after the accept loop stops
278
279 accepting_.store(false, std::memory_order_release);
280
281#ifdef _WIN32
282 if (listen_socket_ != INVALID_SOCKET) {
283 closesocket(listen_socket_);
284 listen_socket_ = INVALID_SOCKET;
285 }
286 if (wsa_initialized_) {
287 WSACleanup();
288 wsa_initialized_ = false;
289 }
290#else
291 if (listen_socket_ >= 0) {
292 close(listen_socket_);
293 listen_socket_ = -1;
294 }
295#endif
296
297 return kcenon::common::ok();
298}
299
301 // Return false to indicate no pending work that must complete before shutdown.
302 // The accept loop will exit promptly when stop() is called.
303 // Note: returning true here would prevent graceful shutdown.
304 return false;
305}
306
308 // Called when stop() is requested, before thread actually stops.
309 // Close the socket to unblock any pending accept operations.
310
311#ifdef _WIN32
312 if (listen_socket_ != INVALID_SOCKET) {
313 closesocket(listen_socket_);
314 listen_socket_ = INVALID_SOCKET;
315 }
316#else
317 if (listen_socket_ >= 0) {
318 close(listen_socket_);
319 listen_socket_ = -1;
320 }
321#endif
322}
323
324// =============================================================================
325// Private Methods
326// =============================================================================
327
329 return session_id_counter_.fetch_add(1, std::memory_order_relaxed);
330}
331
332} // namespace kcenon::pacs::network::detail
Worker thread for accepting incoming DICOM connections.
connection_callback on_connection_
Callback for new connections.
int backlog_
Maximum pending connections in listen queue.
result_void after_stop() override
Cleans up resources after the worker thread stops.
result_void do_work() override
Main work routine - checks for incoming connections.
void set_max_pending_connections(int backlog)
Sets the maximum number of pending connections in the listen queue.
int listen_socket_
Listen socket file descriptor (POSIX)
std::string to_string() const override
Gets a string representation of the worker state.
uint64_t next_session_id()
Generates a unique session ID for new connections.
std::function< void()> maintenance_callback
Callback type for periodic maintenance tasks.
void on_stop_requested() override
Called when stop() is requested.
bool should_continue_work() const override
Determines whether there is pending work that must complete.
std::function< void(uint64_t session_id)> connection_callback
Callback type for new connection events.
result_void before_start() override
Initializes resources before the worker thread starts.
uint16_t port_
TCP port to listen on.
std::atomic< uint64_t > session_id_counter_
Session ID counter for unique connection identification.
maintenance_callback on_maintenance_
Optional callback for maintenance tasks.
uint16_t port() const noexcept
Gets the configured port number.
int max_pending_connections() const noexcept
Gets the current backlog setting.
accept_worker(uint16_t port, connection_callback on_connection, maintenance_callback on_maintenance=nullptr)
Constructs an accept_worker with the specified configuration.
bool is_accepting() const noexcept
Checks if the worker is actively accepting connections.
std::atomic< bool > accepting_
Flag indicating if actively accepting connections.