LLDB mainline
ThreadedCommunication.cpp
Go to the documentation of this file.
1//===-- ThreadedCommunication.cpp -----------------------------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10
14#include "lldb/Utility/Event.h"
17#include "lldb/Utility/Log.h"
18#include "lldb/Utility/Status.h"
19#ifdef _WIN32
21#endif
22
23#include "llvm/Support/Compiler.h"
24
25#include <algorithm>
26#include <chrono>
27#include <cstring>
28#include <memory>
29#include <shared_mutex>
30
31#include <cerrno>
32#include <cinttypes>
33#include <cstdio>
34
35using namespace lldb;
36using namespace lldb_private;
37
39 static constexpr llvm::StringLiteral class_name("lldb.communication");
40 return class_name;
41}
42
44 : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
46 m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
48 "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
49 this, name);
50
51 SetEventName(eBroadcastBitDisconnected, "disconnected");
52 SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
53 SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
54 SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
55 SetEventName(eBroadcastBitPacketAvailable, "packet available");
56 SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
57
59}
60
63 "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
64 this, GetBroadcasterName());
65}
66
72
75 "Disconnecting while the read thread is running is racy!");
76 return Communication::Disconnect(error_ptr);
77}
78
79size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
80 const Timeout<std::micro> &timeout,
81 ConnectionStatus &status,
82 Status *error_ptr) {
85 log,
86 "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
87 this, dst, dst_len, timeout, m_connection_sp.get());
88
90 // We have a dedicated read thread that is getting data for us
91 size_t cached_bytes = GetCachedBytes(dst, dst_len);
92 if (cached_bytes > 0) {
94 return cached_bytes;
95 }
96 if (timeout && timeout->count() == 0) {
97 if (error_ptr)
98 *error_ptr = Status::FromErrorString("Timed out.");
100 return 0;
101 }
102
103 if (!m_connection_sp) {
104 if (error_ptr)
105 *error_ptr = Status::FromErrorString("Invalid connection.");
107 return 0;
108 }
109
110 // No data yet, we have to start listening.
111 ListenerSP listener_sp(
112 Listener::MakeListener("ThreadedCommunication::Read"));
113 listener_sp->StartListeningForEvents(
114 this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
115
116 // Re-check for data, as it might have arrived while we were setting up our
117 // listener.
118 cached_bytes = GetCachedBytes(dst, dst_len);
119 if (cached_bytes > 0) {
121 return cached_bytes;
122 }
123
124 EventSP event_sp;
125 // Explicitly check for the thread exit, for the same reason.
127 // We've missed the event, lets just conjure one up.
128 event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit);
129 } else {
130 if (!listener_sp->GetEvent(event_sp, timeout)) {
131 if (error_ptr)
132 *error_ptr = Status::FromErrorString("Timed out.");
134 return 0;
135 }
136 }
137 const uint32_t event_type = event_sp->GetType();
138 if (event_type & eBroadcastBitReadThreadGotBytes) {
139 return GetCachedBytes(dst, dst_len);
140 }
141
142 if (event_type & eBroadcastBitReadThreadDidExit) {
143 // If the thread exited of its own accord, it either means it
144 // hit an end-of-file condition or an error.
145 status = m_pass_status;
146 if (error_ptr)
147 *error_ptr = std::move(m_pass_error);
148
149 if (GetCloseOnEOF())
150 Disconnect(nullptr);
151 return 0;
152 }
153 llvm_unreachable("Got unexpected event type!");
154 }
155
156 // We aren't using a read thread, just read the data synchronously in this
157 // thread.
158 return Communication::Read(dst, dst_len, timeout, status, error_ptr);
159}
160
162 std::lock_guard<std::mutex> lock(m_read_thread_mutex);
163
164 if (error_ptr)
165 error_ptr->Clear();
166
167 if (m_read_thread.IsJoinable())
168 return true;
169
171 "{0} ThreadedCommunication::StartReadThread ()", this);
172
173 const std::string thread_name =
174 llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
175
178 auto maybe_thread = ThreadLauncher::LaunchThread(
179 thread_name, [this] { return ReadThread(); });
180 if (maybe_thread) {
181 m_read_thread = *maybe_thread;
182 } else {
183 if (error_ptr)
184 *error_ptr = Status::FromError(maybe_thread.takeError());
185 else {
186 LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(),
187 "failed to launch host thread: {0}");
188 }
189 }
190
191 if (!m_read_thread.IsJoinable())
192 m_read_thread_enabled = false;
193
195}
196
198 std::lock_guard<std::mutex> lock(m_read_thread_mutex);
199
200 if (!m_read_thread.IsJoinable())
201 return true;
202
204 "{0} ThreadedCommunication::StopReadThread ()", this);
205
206 m_read_thread_enabled = false;
207
208 BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
209
210 Status error = m_read_thread.Join(nullptr);
211 return error.Success();
212}
213
215 std::lock_guard<std::mutex> lock(m_read_thread_mutex);
216
217 if (!m_read_thread.IsJoinable())
218 return true;
219
220 Status error = m_read_thread.Join(nullptr);
221 return error.Success();
222}
223
224size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
225 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
226 if (!m_bytes.empty()) {
227 // If DST is nullptr and we have a thread, then return the number of bytes
228 // that are available so the caller can call again
229 if (dst == nullptr)
230 return m_bytes.size();
231
232 const size_t len = std::min<size_t>(dst_len, m_bytes.size());
233
234 ::memcpy(dst, m_bytes.c_str(), len);
235 m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
236
237 return len;
238 }
239 return 0;
240}
241
242void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
243 bool broadcast,
244 ConnectionStatus status) {
246 "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
247 "= {2}, "
248 "broadcast = {3})",
249 this, bytes, (uint64_t)len, broadcast);
250 if ((bytes == nullptr || len == 0) &&
252 return;
253 if (m_callback) {
254 // If the user registered a callback, then call it and do not broadcast
255 m_callback(m_callback_baton, bytes, len);
256 } else if (bytes != nullptr && len > 0) {
257 std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
258 m_bytes.append((const char *)bytes, len);
259 if (broadcast)
260 BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
261 }
262}
263
267
270
271 LLDB_LOG(log, "Communication({0}) thread starting...", this);
272
273 uint8_t buf[1024];
274
277 bool done = false;
278 bool disconnect = false;
279 while (!done && m_read_thread_enabled) {
280 size_t bytes_read = ReadFromConnection(
281 buf, sizeof(buf), std::chrono::seconds(5), status, &error);
282 if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
283 AppendBytesToCache(buf, bytes_read, true, status);
284
285 switch (status) {
287 break;
288
290 done = true;
291 disconnect = GetCloseOnEOF();
292 break;
293 case eConnectionStatusError: // Check GetError() for details
294 if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
295 // EIO on a pipe is usually caused by remote shutdown
296 disconnect = GetCloseOnEOF();
297 done = true;
298 }
299#ifdef _WIN32
300 if (error.GetType() == eErrorTypeWin32 &&
301 error.GetError() == ERROR_INVALID_HANDLE) {
302 // ERROR_INVALID_HANDLE on a pipe is usually caused by a remote shutdown
303 // of the pipe's ConPTY
304 disconnect = GetCloseOnEOF();
305 done = true;
306 }
307#endif
308 if (error.Fail())
309 LLDB_LOG(log, "error: {0}, status = {1}", error,
311 break;
312 case eConnectionStatusInterrupted: // Synchronization signal from
313 // SynchronizeWithReadThread()
314 // The connection returns eConnectionStatusInterrupted only when there is
315 // no input pending to be read, so we can signal that.
316 BroadcastEvent(eBroadcastBitNoMorePendingInput);
317 break;
318 case eConnectionStatusNoConnection: // No connection
319 case eConnectionStatusLostConnection: // Lost connection while connected to
320 // a valid connection
321 done = true;
322 [[fallthrough]];
323 case eConnectionStatusTimedOut: // Request timed out
324 if (error.Fail())
325 LLDB_LOG(log, "error: {0}, status = {1}", error,
327 break;
328 }
329 }
330 m_pass_status = status;
331 m_pass_error = std::move(error);
332 LLDB_LOG(log, "Communication({0}) thread exiting...", this);
333
334 // Start shutting down. We need to do this in a very specific order to ensure
335 // we don't race with threads wanting to read/synchronize with us.
336
337 // First, we signal our intent to exit. This ensures no new thread start
338 // waiting on events from us.
340
341 // Unblock any existing thread waiting for the synchronization signal.
342 BroadcastEvent(eBroadcastBitNoMorePendingInput);
343
344 {
345 // Wait for the synchronization thread to finish...
346 std::lock_guard<std::mutex> guard(m_synchronize_mutex);
347 // ... and disconnect.
348 if (disconnect)
349 Disconnect();
350 }
351
352 // Finally, unblock any readers waiting for us to exit.
353 BroadcastEvent(eBroadcastBitReadThreadDidExit);
354 return {};
355}
356
358 ReadThreadBytesReceived callback, void *callback_baton) {
359 m_callback = callback;
360 m_callback_baton = callback_baton;
361}
362
364 m_connection_sp->InterruptRead();
365}
366
368 // Only one thread can do the synchronization dance at a time.
369 std::lock_guard<std::mutex> guard(m_synchronize_mutex);
370
371 // First start listening for the synchronization event.
373 "ThreadedCommunication::SyncronizeWithReadThread"));
374 listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
375
376 // If the thread is not running, there is no point in synchronizing.
378 return;
379
380 // Notify the read thread.
382
383 // Wait for the synchronization event.
384 EventSP event_sp;
385 listener_sp->GetEvent(event_sp, std::nullopt);
386}
387
389 std::unique_ptr<Connection> connection) {
390 StopReadThread(nullptr);
391 Communication::SetConnection(std::move(connection));
392}
static llvm::raw_ostream & error(Stream &strm)
#define LLDB_LOG(log,...)
The LLDB_LOG* macros defined below are the way to emit log messages.
Definition Log.h:369
#define LLDB_LOG_ERROR(log, error,...)
Definition Log.h:392
Broadcaster(lldb::BroadcasterManagerSP manager_sp, std::string name)
Construct with a broadcaster with a name.
void SetEventName(uint32_t event_mask, const char *name)
Set the name for an event bit.
const std::string & GetBroadcasterName()
Get this broadcaster's name.
void BroadcastEventIfUnique(lldb::EventSP &event_sp)
void BroadcastEvent(lldb::EventSP &event_sp)
Broadcast an event which has no associated data.
size_t ReadFromConnection(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr)
virtual size_t Read(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr)
Read bytes from the current connection.
lldb::ConnectionSP m_connection_sp
The connection that is current in use by this communications class.
virtual void SetConnection(std::unique_ptr< Connection > connection)
Sets the connection that it to be used by this class.
virtual lldb::ConnectionStatus Disconnect(Status *error_ptr=nullptr)
Disconnect the communications connection if one is currently connected.
static std::string ConnectionStatusAsString(lldb::ConnectionStatus status)
static lldb::ListenerSP MakeListener(const char *name)
Definition Listener.cpp:375
An error handling class.
Definition Status.h:118
void Clear()
Clear the object state.
Definition Status.cpp:214
static Status FromErrorString(const char *str)
Definition Status.h:141
static Status FromError(llvm::Error error)
Avoid using this in new code. Migrate APIs to llvm::Expected instead.
Definition Status.cpp:136
static llvm::Expected< HostThread > LaunchThread(llvm::StringRef name, std::function< lldb::thread_result_t()> thread_function, size_t min_stack_byte_size=0)
virtual void AppendBytesToCache(const uint8_t *src, size_t src_len, bool broadcast, lldb::ConnectionStatus status)
Append new bytes that get read from the read thread into the internal object byte cache.
size_t Read(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr) override
Read bytes from the current connection.
void SetConnection(std::unique_ptr< Connection > connection) override
Sets the connection that it to be used by this class.
ThreadedCommunication(const char *broadcaster_name)
Construct the ThreadedCommunication object with the specified name for the Broadcaster that this obje...
std::recursive_mutex m_bytes_mutex
A mutex to protect multi-threaded access to the cached bytes.
HostThread m_read_thread
The read thread handle in case we need to cancel the thread.
virtual bool StopReadThread(Status *error_ptr=nullptr)
Stops the read thread by cancelling it.
void SynchronizeWithReadThread()
Wait for the read thread to process all outstanding data.
Communication()
Construct the Communication object.
std::atomic< bool > m_read_thread_did_exit
Whether the read thread is enabled.
lldb::ConnectionStatus m_pass_status
Connection status passthrough from read thread.
lldb::ConnectionStatus Disconnect(Status *error_ptr=nullptr) override
Disconnect the communications connection if one is currently connected.
void(* ReadThreadBytesReceived)(void *baton, const void *src, size_t src_len)
Status m_pass_error
Error passthrough from read thread.
virtual bool StartReadThread(Status *error_ptr=nullptr)
Starts a read thread whose sole purpose it to read bytes from the current connection.
std::atomic< bool > m_read_thread_enabled
Whether the read thread is enabled.
static llvm::StringRef GetStaticBroadcasterClass()
virtual bool JoinReadThread(Status *error_ptr=nullptr)
void InterruptRead()
Interrupts the current read.
std::string m_bytes
A buffer to cache bytes read in the ReadThread function.
size_t GetCachedBytes(void *dst, size_t dst_len)
Get any available bytes from our data cache.
bool ReadThreadIsRunning()
Checks if there is a currently running read thread.
lldb::thread_result_t ReadThread()
The read thread function.
void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, void *callback_baton)
A class that represents a running process on the host machine.
Log * GetLog(Cat mask)
Retrieve the Log object for the channel associated with the given log enum.
Definition Log.h:332
void * thread_result_t
Definition lldb-types.h:62
ConnectionStatus
Connection Status Types.
@ eConnectionStatusError
Check GetError() for details.
@ eConnectionStatusInterrupted
Interrupted read.
@ eConnectionStatusTimedOut
Request timed out.
@ eConnectionStatusEndOfFile
End-of-file encountered.
@ eConnectionStatusSuccess
Success.
@ eConnectionStatusLostConnection
Lost connection while connected to a valid connection.
@ eConnectionStatusNoConnection
No connection.
@ eErrorTypeWin32
Standard Win32 error codes.
@ eErrorTypePOSIX
POSIX error codes.
std::shared_ptr< lldb_private::Event > EventSP
std::shared_ptr< lldb_private::Listener > ListenerSP