quic/qbox
Loading...
Searching...
No Matches
char_backend_socket.h
1/*
2 * Copyright (c) 2022 Qualcomm Innovation Center, Inc. All Rights Reserved.
3 *
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
12#ifndef _GS_UART_BACKEND_SOCKET_H_
13#define _GS_UART_BACKEND_SOCKET_H_
14
15#include <unistd.h>
16#include <cstring>
17#include <stdio.h>
18
19#include <asio.hpp>
20#include <thread>
21#include <chrono>
22#include <atomic>
23
24#include <scp/report.h>
25
26#include <async_event.h>
27#include <uutils.h>
28#include <ports/biflow-socket.h>
29#include <module_factory_registery.h>
30
31using asio::ip::tcp;
32class char_backend_socket : public sc_core::sc_module
33{
34protected:
35 cci::cci_param<std::string> p_address;
36 cci::cci_param<bool> p_server;
37 cci::cci_param<bool> p_nowait;
38 cci::cci_param<bool> p_sigquit;
39
40private:
41 asio::io_context m_io_context;
42 tcp::acceptor m_acceptor;
43 tcp::socket m_asio_socket;
44 std::thread m_rcv_thread;
45
46 tcp::endpoint m_local_endpoint;
47
48 static constexpr size_t m_buffer_size = 1024;
49 std::array<char, m_buffer_size> m_buffer;
50
51 // Allow only 1 connection at a time
52 const int m_tcp_backlog = 1;
53
54 // Flag to signal end of operation to receive thread
55 std::atomic<bool> m_stop_rcv_thread = false;
56
57 const std::chrono::milliseconds m_retry_delay = std::chrono::milliseconds(100);
58
59 SCP_LOGGER();
60
61public:
63
64 char_backend_socket(sc_core::sc_module_name name)
65 : sc_core::sc_module(name)
66 , p_address("address", "127.0.0.1:4001", "socket address IP:Port")
67 , p_server("server", true, "type of socket: true if server - false if client")
68 , p_nowait("nowait", true, "setting socket in non-blocking mode")
69 , p_sigquit("sigquit", false, "Interpret 0x1c in the data stream as a sigquit")
70 , m_biflow_socket("biflow_socket")
71 , m_acceptor(m_io_context)
72 , m_asio_socket(m_io_context)
73 {
74 SCP_TRACE(()) << "char_backend_socket constructor";
75
76 if (!set_endpoint()) {
77 SCP_FATAL(()) << "Failed to set local endpoint";
78 }
79
80 m_biflow_socket.register_b_transport(this, &char_backend_socket::writefn);
81 }
82
83 ~char_backend_socket() { cleanup_receive_thread(); }
84
85 void end_of_simulation() { cleanup_receive_thread(); }
86
87 void cleanup_receive_thread()
88 {
89 m_stop_rcv_thread = true;
90 asio::error_code ignored_error;
91
92 m_asio_socket.close(ignored_error);
93 m_acceptor.close(ignored_error);
94
95 if (m_rcv_thread.joinable()) m_rcv_thread.join();
96 }
97
98 bool set_endpoint()
99 {
100 std::string param_ip, param_port;
101 asio::ip::address ip;
102 unsigned short port;
103
104 size_t count = std::count(p_address.get_value().begin(), p_address.get_value().end(), ':');
105 if (count == 1) {
106 size_t first = p_address.get_value().find_first_of(':');
107 param_ip = p_address.get_value().substr(0, first);
108 param_port = p_address.get_value().substr(first + 1);
109 } else {
110 SCP_ERR(()) << "malformed address, expecting IP:PORT (e.g. 127.0.0.1:4001)";
111 return false;
112 }
113
114 asio::error_code ec;
115 ip = asio::ip::make_address(param_ip, ec);
116 if (ec) {
117 SCP_ERR(()) << "Invalid IP address: " << param_ip << ", error: " << ec.message();
118 return false;
119 }
120
121 try {
122 port = static_cast<unsigned short>(std::stoi(param_port));
123 } catch (const std::invalid_argument& e) {
124 SCP_ERR(()) << "Invalid port: " << param_port << " (not a number)";
125 return false;
126 } catch (const std::out_of_range& e) {
127 SCP_ERR(()) << "Port out of range: " << param_port;
128 return false;
129 }
130
131 m_local_endpoint = tcp::endpoint(ip, port);
132
133 return true;
134 }
135
136 void start_of_simulation()
137 {
138 SCP_DEBUG(()) << "IP: " << m_local_endpoint.address() << ", PORT: " << m_local_endpoint.port();
139
140 if (p_server) {
141 setup_tcp_server();
142 }
143
144 m_rcv_thread = std::thread(&char_backend_socket::rcv_thread, this);
145 }
146
147 void end_of_elaboration() { m_biflow_socket.can_receive_any(); }
148
149 void setup_tcp_server()
150 {
151 asio::error_code ec;
152
153 m_acceptor.open(m_local_endpoint.protocol(), ec);
154 if (ec) {
155 SCP_ERR(()) << "Failed to open acceptor: " << ec.message();
156 return;
157 }
158
159 m_acceptor.set_option(asio::socket_base::reuse_address(true), ec);
160 if (ec) {
161 SCP_ERR(()) << "Failed to set reuse_address option: " << ec.message();
162 m_acceptor.close();
163 return;
164 }
165
166 m_acceptor.bind(m_local_endpoint, ec);
167 if (ec) {
168 SCP_ERR(()) << "Failed to bind acceptor: " << ec.message();
169 m_acceptor.close();
170 return;
171 }
172
173 m_acceptor.listen(m_tcp_backlog, ec);
174 if (ec) {
175 SCP_ERR(()) << "Failed to listen on acceptor: " << ec.message();
176 m_acceptor.close();
177 return;
178 }
179
180 m_acceptor.non_blocking(true, ec);
181 if (ec) {
182 SCP_ERR(()) << "Failed to set acceptor to non-blocking: " << ec.message();
183 m_acceptor.close();
184 return;
185 }
186 }
187
188 void writefn(tlm::tlm_generic_payload& txn, sc_core::sc_time& t)
189 {
190 while (!m_asio_socket.is_open()) {
191 if (p_nowait) {
192 return;
193 }
194 SCP_WARN(()) << "waiting for socket connection on IP address: " << p_address.get_value();
195 std::this_thread::sleep_for(m_retry_delay);
196 }
197
198 asio::error_code ec;
199
200 uint8_t* data = txn.get_data_ptr();
201 size_t bytes_to_send = txn.get_streaming_width();
202 size_t bytes_transferred = asio::write(m_asio_socket, asio::buffer(data, bytes_to_send), ec);
203 if (ec || (bytes_transferred != bytes_to_send)) {
204 if (p_nowait) {
205 SCP_WARN(())("(Non blocking) socket closed");
206 return;
207 } else {
208 SCP_WARN(())("(Blocking) socket closed.");
209 }
210 }
211 }
212
213 void rcv_thread()
214 {
215 try {
216 do_receive();
217 } catch (const asio::system_error& e) {
218 std::cout << "Caught exception: " << e.what() << std::endl;
219 }
220 }
221
222 void configure_socket()
223 {
224 asio::error_code ec;
225
226 m_asio_socket.non_blocking(true, ec);
227 if (ec) {
228 SCP_FATAL(()) << " Failed to set non blocking mode: " << ec.message();
229 }
230
231 // Disable Nagle's Algorithm
232 m_asio_socket.set_option(asio::ip::tcp::no_delay(true));
233 if (ec) {
234 SCP_FATAL(()) << " Failed to disable Nagle's Algorithm: " << ec.message();
235 }
236 }
237
238 void do_receive()
239 {
240 asio::error_code ec;
241
242 while (!m_stop_rcv_thread) {
243 if (p_server) {
244 // Server mode
245 m_acceptor.accept(m_asio_socket, ec);
246 switch (ec.value()) {
247 case 0: // Success
248 configure_socket();
249 SCP_DEBUG(()) << "Accepted connection from " << m_asio_socket.remote_endpoint().address() << ":"
250 << m_asio_socket.remote_endpoint().port();
251 break;
252 case asio::error::would_block:
253 SCP_INFO(()) << "Accept failed for non blocking:" << ec.message();
254 std::this_thread::sleep_for(m_retry_delay);
255 break;
256 default:
257 SCP_WARN(()) << "Accept connection failed with error:" << ec.message();
258 std::this_thread::sleep_for(m_retry_delay);
259 break;
260 }
261 } else if (!p_server) {
262 // Client mode
263 m_asio_socket.connect(m_local_endpoint, ec);
264 if (!ec) {
265 configure_socket();
266 } else {
267 SCP_INFO(()) << "failed to connect to " << m_local_endpoint.address() << ":"
268 << m_local_endpoint.port() << " " << ec.message();
269 m_asio_socket.close();
270 std::this_thread::sleep_for(m_retry_delay);
271 continue;
272 }
273 }
274
275 if (m_asio_socket.is_open()) receive_loop();
276 }
277
278 SCP_DEBUG(()) << "Leaving receive thread";
279 }
280
281 void receive_loop()
282 {
283 asio::error_code ec;
284
285 while (m_asio_socket.is_open()) {
286 size_t read_count = m_asio_socket.read_some(asio::buffer(m_buffer), ec);
287 switch (ec.value()) {
288 case 0: // Success
289 forward_incoming_data(read_count);
290 break;
291 case asio::error::would_block:
292 // Retry reading after a short delay
293 std::this_thread::sleep_for(m_retry_delay);
294 break;
295 case asio::error::eof:
296 SCP_DEBUG(()) << "Remote endpoint disconnected";
297 m_asio_socket.close();
298 break;
299 default:
300 SCP_WARN(()) << "Read failed: " << ec.message();
301 m_asio_socket.close();
302 break;
303 }
304 }
305
306 SCP_DEBUG(()) << "Leaving receive loop";
307 }
308
309 void forward_incoming_data(size_t data_length)
310 {
311 for (size_t i = 0; i < data_length; i++) {
312 unsigned char c = m_buffer[i];
313 if (p_sigquit && c == 0x1c) {
314 sc_core::sc_stop();
315 }
316 m_biflow_socket.enqueue(c);
317 }
318 }
319};
320extern "C" void module_register();
321#endif
Definition target.h:160
Definition char_backend_socket.h:33
Definition biflow-socket.h:73
void can_receive_any()
can_receive_any Allow unlimited items to arrive.
Definition biflow-socket.h:263
void enqueue(T data)
enqueue Enqueue data to be sent (unlimited queue size) NOTE: Thread safe.
Definition biflow-socket.h:276
void register_b_transport(MODULE *mod, void(MODULE::*cb)(tlm::tlm_generic_payload &, sc_core::sc_time &))
Register b_transport to be called whenever data is received from the socket.
Definition biflow-socket.h:226