LLDB  mainline
ThreadedCommunication.cpp
Go to the documentation of this file.
1 //===-- ThreadedCommunication.cpp -----------------------------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
10 
14 #include "lldb/Utility/Event.h"
15 #include "lldb/Utility/LLDBLog.h"
16 #include "lldb/Utility/Listener.h"
17 #include "lldb/Utility/Log.h"
18 #include "lldb/Utility/Status.h"
19 
20 #include "llvm/Support/Compiler.h"
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <cstring>
25 #include <memory>
26 
27 #include <cerrno>
28 #include <cinttypes>
29 #include <cstdio>
30 
31 using namespace lldb;
32 using namespace lldb_private;
33 
34 ConstString &ThreadedCommunication::GetStaticBroadcasterClass() {
35  static ConstString class_name("lldb.communication");
36  return class_name;
37 }
38 
39 ThreadedCommunication::ThreadedCommunication(const char *name)
40  : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
41  m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),
42  m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
44  "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
45  this, name);
46 
47  SetEventName(eBroadcastBitDisconnected, "disconnected");
48  SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
49  SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
50  SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
51  SetEventName(eBroadcastBitPacketAvailable, "packet available");
52  SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
53 
55 }
56 
59  "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
60  this, GetBroadcasterName().AsCString());
61 }
62 
64  SetReadThreadBytesReceivedCallback(nullptr, nullptr);
65  StopReadThread(nullptr);
67 }
68 
71  "Disconnecting while the read thread is running is racy!");
72  return Communication::Disconnect(error_ptr);
73 }
74 
75 size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
76  const Timeout<std::micro> &timeout,
77  ConnectionStatus &status,
78  Status *error_ptr) {
80  LLDB_LOG(
81  log,
82  "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
83  this, dst, dst_len, timeout, m_connection_sp.get());
84 
86  // We have a dedicated read thread that is getting data for us
87  size_t cached_bytes = GetCachedBytes(dst, dst_len);
88  if (cached_bytes > 0) {
89  status = eConnectionStatusSuccess;
90  return cached_bytes;
91  }
92  if (timeout && timeout->count() == 0) {
93  if (error_ptr)
94  error_ptr->SetErrorString("Timed out.");
96  return 0;
97  }
98 
99  if (!m_connection_sp) {
100  if (error_ptr)
101  error_ptr->SetErrorString("Invalid connection.");
103  return 0;
104  }
105 
106  // No data yet, we have to start listening.
107  ListenerSP listener_sp(
108  Listener::MakeListener("ThreadedCommunication::Read"));
109  listener_sp->StartListeningForEvents(
110  this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
111 
112  // Re-check for data, as it might have arrived while we were setting up our
113  // listener.
114  cached_bytes = GetCachedBytes(dst, dst_len);
115  if (cached_bytes > 0) {
116  status = eConnectionStatusSuccess;
117  return cached_bytes;
118  }
119 
120  EventSP event_sp;
121  // Explicitly check for the thread exit, for the same reason.
123  // We've missed the event, lets just conjure one up.
124  event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit);
125  } else {
126  if (!listener_sp->GetEvent(event_sp, timeout)) {
127  if (error_ptr)
128  error_ptr->SetErrorString("Timed out.");
129  status = eConnectionStatusTimedOut;
130  return 0;
131  }
132  }
133  const uint32_t event_type = event_sp->GetType();
134  if (event_type & eBroadcastBitReadThreadGotBytes) {
135  return GetCachedBytes(dst, dst_len);
136  }
137 
138  if (event_type & eBroadcastBitReadThreadDidExit) {
139  // If the thread exited of its own accord, it either means it
140  // hit an end-of-file condition or an error.
141  status = m_pass_status;
142  if (error_ptr)
143  *error_ptr = std::move(m_pass_error);
144 
145  if (GetCloseOnEOF())
146  Disconnect(nullptr);
147  return 0;
148  }
149  llvm_unreachable("Got unexpected event type!");
150  }
151 
152  // We aren't using a read thread, just read the data synchronously in this
153  // thread.
154  return Communication::Read(dst, dst_len, timeout, status, error_ptr);
155 }
156 
158  if (error_ptr)
159  error_ptr->Clear();
160 
162  return true;
163 
165  "{0} ThreadedCommunication::StartReadThread ()", this);
166 
167  const std::string thread_name =
168  llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
169 
170  m_read_thread_enabled = true;
171  m_read_thread_did_exit = false;
172  auto maybe_thread = ThreadLauncher::LaunchThread(
173  thread_name, [this] { return ReadThread(); });
174  if (maybe_thread) {
175  m_read_thread = *maybe_thread;
176  } else {
177  if (error_ptr)
178  *error_ptr = Status(maybe_thread.takeError());
179  else {
180  LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
181  llvm::toString(maybe_thread.takeError()));
182  }
183  }
184 
185  if (!m_read_thread.IsJoinable())
186  m_read_thread_enabled = false;
187 
188  return m_read_thread_enabled;
189 }
190 
192  if (!m_read_thread.IsJoinable())
193  return true;
194 
196  "{0} ThreadedCommunication::StopReadThread ()", this);
197 
198  m_read_thread_enabled = false;
199 
200  BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
201 
202  // error = m_read_thread.Cancel();
203 
204  Status error = m_read_thread.Join(nullptr);
205  return error.Success();
206 }
207 
209  if (!m_read_thread.IsJoinable())
210  return true;
211 
212  Status error = m_read_thread.Join(nullptr);
213  return error.Success();
214 }
215 
216 size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
217  std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
218  if (!m_bytes.empty()) {
219  // If DST is nullptr and we have a thread, then return the number of bytes
220  // that are available so the caller can call again
221  if (dst == nullptr)
222  return m_bytes.size();
223 
224  const size_t len = std::min<size_t>(dst_len, m_bytes.size());
225 
226  ::memcpy(dst, m_bytes.c_str(), len);
227  m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
228 
229  return len;
230  }
231  return 0;
232 }
233 
234 void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
235  bool broadcast,
236  ConnectionStatus status) {
238  "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
239  "= {2}, "
240  "broadcast = {3})",
241  this, bytes, (uint64_t)len, broadcast);
242  if ((bytes == nullptr || len == 0) &&
244  return;
245  if (m_callback) {
246  // If the user registered a callback, then call it and do not broadcast
247  m_callback(m_callback_baton, bytes, len);
248  } else if (bytes != nullptr && len > 0) {
249  std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
250  m_bytes.append((const char *)bytes, len);
251  if (broadcast)
252  BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
253  }
254 }
255 
257  return m_read_thread_enabled;
258 }
259 
262 
263  LLDB_LOG(log, "Communication({0}) thread starting...", this);
264 
265  uint8_t buf[1024];
266 
267  Status error;
269  bool done = false;
270  bool disconnect = false;
271  while (!done && m_read_thread_enabled) {
272  size_t bytes_read = ReadFromConnection(
273  buf, sizeof(buf), std::chrono::seconds(5), status, &error);
274  if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
275  AppendBytesToCache(buf, bytes_read, true, status);
276 
277  switch (status) {
279  break;
280 
282  done = true;
283  disconnect = GetCloseOnEOF();
284  break;
285  case eConnectionStatusError: // Check GetError() for details
286  if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
287  // EIO on a pipe is usually caused by remote shutdown
288  disconnect = GetCloseOnEOF();
289  done = true;
290  }
291  if (error.Fail())
292  LLDB_LOG(log, "error: {0}, status = {1}", error,
294  break;
295  case eConnectionStatusInterrupted: // Synchronization signal from
296  // SynchronizeWithReadThread()
297  // The connection returns eConnectionStatusInterrupted only when there is
298  // no input pending to be read, so we can signal that.
299  BroadcastEvent(eBroadcastBitNoMorePendingInput);
300  break;
301  case eConnectionStatusNoConnection: // No connection
302  case eConnectionStatusLostConnection: // Lost connection while connected to
303  // a valid connection
304  done = true;
305  [[fallthrough]];
306  case eConnectionStatusTimedOut: // Request timed out
307  if (error.Fail())
308  LLDB_LOG(log, "error: {0}, status = {1}", error,
310  break;
311  }
312  }
313  m_pass_status = status;
314  m_pass_error = std::move(error);
315  LLDB_LOG(log, "Communication({0}) thread exiting...", this);
316 
317  // Start shutting down. We need to do this in a very specific order to ensure
318  // we don't race with threads wanting to read/synchronize with us.
319 
320  // First, we signal our intent to exit. This ensures no new thread start
321  // waiting on events from us.
322  m_read_thread_did_exit = true;
323 
324  // Unblock any existing thread waiting for the synchronization signal.
325  BroadcastEvent(eBroadcastBitNoMorePendingInput);
326 
327  {
328  // Wait for the synchronization thread to finish...
329  std::lock_guard<std::mutex> guard(m_synchronize_mutex);
330  // ... and disconnect.
331  if (disconnect)
332  Disconnect();
333  }
334 
335  // Finally, unblock any readers waiting for us to exit.
336  BroadcastEvent(eBroadcastBitReadThreadDidExit);
337  return {};
338 }
339 
341  ReadThreadBytesReceived callback, void *callback_baton) {
342  m_callback = callback;
343  m_callback_baton = callback_baton;
344 }
345 
347  // Only one thread can do the synchronization dance at a time.
348  std::lock_guard<std::mutex> guard(m_synchronize_mutex);
349 
350  // First start listening for the synchronization event.
351  ListenerSP listener_sp(Listener::MakeListener(
352  "ThreadedCommunication::SyncronizeWithReadThread"));
353  listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
354 
355  // If the thread is not running, there is no point in synchronizing.
357  return;
358 
359  // Notify the read thread.
360  m_connection_sp->InterruptRead();
361 
362  // Wait for the synchronization event.
363  EventSP event_sp;
364  listener_sp->GetEvent(event_sp, std::nullopt);
365 }
366 
368  std::unique_ptr<Connection> connection) {
369  StopReadThread(nullptr);
370  Communication::SetConnection(std::move(connection));
371 }
lldb_private::toString
const char * toString(AppleArm64ExceptionClass EC)
Definition: AppleArm64ExceptionClass.h:38
lldb_private::ThreadedCommunication::SynchronizeWithReadThread
void SynchronizeWithReadThread()
Wait for the read thread to process all outstanding data.
Definition: ThreadedCommunication.cpp:346
lldb_private::Broadcaster::CheckInWithManager
void CheckInWithManager()
Definition: Broadcaster.cpp:46
lldb_private::ThreadedCommunication::ReadThread
lldb::thread_result_t ReadThread()
The read thread function.
Definition: ThreadedCommunication.cpp:260
lldb_private::ThreadedCommunication::m_read_thread_did_exit
std::atomic< bool > m_read_thread_did_exit
Definition: ThreadedCommunication.h:229
lldb_private::ThreadLauncher::LaunchThread
static llvm::Expected< HostThread > LaunchThread(llvm::StringRef name, std::function< lldb::thread_result_t()> thread_function, size_t min_stack_byte_size=0)
Definition: ThreadLauncher.cpp:25
lldb_private::ThreadedCommunication::ReadThreadIsRunning
bool ReadThreadIsRunning()
Checks if there is a currently running read thread.
Definition: ThreadedCommunication.cpp:256
lldb_private::ThreadedCommunication::SetConnection
void SetConnection(std::unique_ptr< Connection > connection) override
Sets the connection that it to be used by this class.
Definition: ThreadedCommunication.cpp:367
lldb_private::ThreadedCommunication::StopReadThread
virtual bool StopReadThread(Status *error_ptr=nullptr)
Stops the read thread by cancelling it.
Definition: ThreadedCommunication.cpp:191
lldb_private::ThreadedCommunication::m_synchronize_mutex
std::mutex m_synchronize_mutex
Definition: ThreadedCommunication.h:237
lldb_private::Communication::ConnectionStatusAsString
static std::string ConnectionStatusAsString(lldb::ConnectionStatus status)
Definition: Communication.cpp:147
lldb_private::Broadcaster::SetEventName
void SetEventName(uint32_t event_mask, const char *name)
Set the name for an event bit.
Definition: Broadcaster.h:337
lldb_private::ThreadedCommunication::Clear
void Clear() override
Definition: ThreadedCommunication.cpp:63
lldb_private::ThreadedCommunication::m_bytes
std::string m_bytes
A buffer to cache bytes read in the ReadThread function.
Definition: ThreadedCommunication.h:231
lldb_private::ThreadedCommunication::m_pass_error
Status m_pass_error
Error passthrough from read thread.
Definition: ThreadedCommunication.h:236
ReadThreadBytesReceived
static void ReadThreadBytesReceived(void *baton, const void *src, size_t src_len)
Definition: ScriptInterpreter.cpp:142
lldb_private::LLDBLog::Host
@ Host
lldb_private::ThreadedCommunication::JoinReadThread
virtual bool JoinReadThread(Status *error_ptr=nullptr)
Definition: ThreadedCommunication.cpp:208
lldb_private::Communication::m_connection_sp
lldb::ConnectionSP m_connection_sp
The connection that is current in use by this communications class.
Definition: Communication.h:168
lldb::eConnectionStatusNoConnection
@ eConnectionStatusNoConnection
No connection.
Definition: lldb-enumerations.h:301
Listener.h
ThreadLauncher.h
lldb::eConnectionStatusEndOfFile
@ eConnectionStatusEndOfFile
End-of-file encountered.
Definition: lldb-enumerations.h:298
lldb::eErrorTypePOSIX
@ eErrorTypePOSIX
POSIX error codes.
Definition: lldb-enumerations.h:311
lldb_private::Communication::SetConnection
virtual void SetConnection(std::unique_ptr< Connection > connection)
Sets the connection that it to be used by this class.
Definition: Communication.cpp:141
lldb_private::Communication
Definition: Communication.h:39
lldb_private::ThreadedCommunication::m_bytes_mutex
std::recursive_mutex m_bytes_mutex
A mutex to protect multi-threaded access to the cached bytes.
Definition: ThreadedCommunication.h:232
error
static llvm::raw_ostream & error(Stream &strm)
Definition: CommandReturnObject.cpp:17
lldb_private::ThreadedCommunication::m_callback
ReadThreadBytesReceived m_callback
Definition: ThreadedCommunication.h:238
lldb_private::Communication::ReadFromConnection
size_t ReadFromConnection(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr)
Definition: Communication.cpp:127
lldb_private::ThreadedCommunication::SetReadThreadBytesReceivedCallback
void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback, void *callback_baton)
Definition: ThreadedCommunication.cpp:340
Log.h
lldb_private::Communication::Clear
virtual void Clear()
Definition: Communication.cpp:37
lldb_private::Broadcaster::BroadcastEventIfUnique
void BroadcastEventIfUnique(lldb::EventSP &event_sp)
Definition: Broadcaster.h:266
lldb_private::ConstString
Definition: ConstString.h:39
lldb_private::ThreadedCommunication::~ThreadedCommunication
~ThreadedCommunication() override
Destructor.
Definition: ThreadedCommunication.cpp:57
lldb::eConnectionStatusTimedOut
@ eConnectionStatusTimedOut
Request timed out.
Definition: lldb-enumerations.h:300
lldb::ConnectionStatus
ConnectionStatus
Connection Status Types.
Definition: lldb-enumerations.h:296
lldb::eConnectionStatusInterrupted
@ eConnectionStatusInterrupted
Interrupted read.
Definition: lldb-enumerations.h:304
string
string(SUBSTRING ${p} 10 -1 pStripped) if($
Definition: Plugins/CMakeLists.txt:40
Event.h
Connection.h
lldb_private::ThreadedCommunication::m_read_thread
HostThread m_read_thread
The read thread handle in case we need to cancel the thread.
Definition: ThreadedCommunication.h:226
lldb_private::ThreadedCommunication::StartReadThread
virtual bool StartReadThread(Status *error_ptr=nullptr)
Starts a read thread whose sole purpose it to read bytes from the current connection.
Definition: ThreadedCommunication.cpp:157
lldb_private::Broadcaster::BroadcastEvent
void BroadcastEvent(lldb::EventSP &event_sp)
Broadcast an event which has no associated data.
Definition: Broadcaster.h:262
lldb_private::LLDBLog::Communication
@ Communication
lldb_private::Communication::Disconnect
virtual lldb::ConnectionStatus Disconnect(Status *error_ptr=nullptr)
Disconnect the communications connection if one is currently connected.
Definition: Communication.cpp:55
lldb_private::HostThread::Join
Status Join(lldb::thread_result_t *result)
Definition: HostThread.cpp:20
lldb_private::Status
Definition: Status.h:44
lldb_private::Timeout< std::micro >
lldb_private::Communication::Read
virtual size_t Read(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr)
Read bytes from the current connection.
Definition: Communication.cpp:86
lldb_private::ThreadedCommunication::m_pass_status
lldb::ConnectionStatus m_pass_status
Connection status passthrough from read thread.
Definition: ThreadedCommunication.h:234
uint32_t
lldb::thread_result_t
void * thread_result_t
Definition: lldb-types.h:62
lldb::eConnectionStatusError
@ eConnectionStatusError
Check GetError() for details.
Definition: lldb-enumerations.h:299
lldb_private::LLDBLog::Object
@ Object
ThreadedCommunication.h
lldb_private::Status::SetErrorString
void SetErrorString(llvm::StringRef err_str)
Set the current error string to err_str.
Definition: Status.cpp:241
lldb_private::ThreadedCommunication::Read
size_t Read(void *dst, size_t dst_len, const Timeout< std::micro > &timeout, lldb::ConnectionStatus &status, Status *error_ptr) override
Read bytes from the current connection.
Definition: ThreadedCommunication.cpp:75
lldb_private::ThreadedCommunication::AppendBytesToCache
virtual void AppendBytesToCache(const uint8_t *src, size_t src_len, bool broadcast, lldb::ConnectionStatus status)
Append new bytes that get read from the read thread into the internal object byte cache.
Definition: ThreadedCommunication.cpp:234
lldb_private::Communication::GetCloseOnEOF
bool GetCloseOnEOF() const
Definition: Communication.h:163
lldb_private::ThreadedCommunication::m_read_thread_enabled
std::atomic< bool > m_read_thread_enabled
Definition: ThreadedCommunication.h:228
LLDB_LOG
#define LLDB_LOG(log,...)
The LLDB_LOG* macros defined below are the way to emit log messages.
Definition: Log.h:337
lldb_private::ThreadedCommunication::Disconnect
lldb::ConnectionStatus Disconnect(Status *error_ptr=nullptr) override
Disconnect the communications connection if one is currently connected.
Definition: ThreadedCommunication.cpp:69
lldb::eConnectionStatusSuccess
@ eConnectionStatusSuccess
Success.
Definition: lldb-enumerations.h:297
Status.h
lldb_private::Broadcaster::GetBroadcasterName
ConstString GetBroadcasterName()
Get the NULL terminated C string name of this Broadcaster object.
Definition: Broadcaster.h:317
lldb_private::HostThread::IsJoinable
bool IsJoinable() const
Definition: HostThread.cpp:30
lldb_private
A class that represents a running process on the host machine.
Definition: SBCommandInterpreterRunOptions.h:16
lldb_private::Status::Clear
void Clear()
Clear the object state.
Definition: Status.cpp:167
ConstString.h
lldb::eConnectionStatusLostConnection
@ eConnectionStatusLostConnection
Lost connection while connected to a valid connection.
Definition: lldb-enumerations.h:302
lldb_private::Log
Definition: Log.h:115
lldb_private::Listener::MakeListener
static lldb::ListenerSP MakeListener(const char *name)
Definition: Listener.cpp:460
lldb_private::GetLog
Log * GetLog(Cat mask)
Retrieve the Log object for the channel associated with the given log enum.
Definition: Log.h:309
lldb_private::Broadcaster
Definition: Broadcaster.h:242
lldb_private::ThreadedCommunication::m_callback_baton
void * m_callback_baton
Definition: ThreadedCommunication.h:239
lldb
Definition: SBAddress.h:15
LLDBLog.h
lldb_private::ThreadedCommunication::GetCachedBytes
size_t GetCachedBytes(void *dst, size_t dst_len)
Get any available bytes from our data cache.
Definition: ThreadedCommunication.cpp:216