-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy path_dummy_thread.py
More file actions
338 lines (255 loc) · 8.74 KB
/
_dummy_thread.py
File metadata and controls
338 lines (255 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
"""Drop-in replacement for the thread module.
Meant to be used as a brain-dead substitute so that threaded code does
not need to be rewritten for when the thread module is not present.
Suggested usage is::
try:
import _thread
except ImportError:
import _dummy_thread as _thread
"""
# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = [
"error",
"start_new_thread",
"exit",
"get_ident",
"allocate_lock",
"interrupt_main",
"LockType",
"RLock",
"_count",
"start_joinable_thread",
"daemon_threads_allowed",
"_shutdown",
"_make_thread_handle",
"_ThreadHandle",
"_get_main_thread_ident",
"_is_main_interpreter",
"_local",
]
# A dummy value
TIMEOUT_MAX = 2**31
# Main thread ident for dummy implementation
_MAIN_THREAD_IDENT = -1
# NOTE: this module can be imported early in the extension building process,
# and so top level imports of other modules should be avoided. Instead, all
# imports are done when needed on a function-by-function basis. Since threads
# are disabled, the import lock should not be an issue anyway (??).
error = RuntimeError
def start_new_thread(function, args, kwargs={}):
"""Dummy implementation of _thread.start_new_thread().
Compatibility is maintained by making sure that ``args`` is a
tuple and ``kwargs`` is a dictionary. If an exception is raised
and it is SystemExit (which can be done by _thread.exit()) it is
caught and nothing is done; all other exceptions are printed out
by using traceback.print_exc().
If the executed function calls interrupt_main the KeyboardInterrupt will be
raised when the function returns.
"""
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def start_joinable_thread(function, handle=None, daemon=True):
"""Dummy implementation of _thread.start_joinable_thread().
In dummy thread, we just run the function synchronously.
"""
if handle is None:
handle = _ThreadHandle()
try:
function()
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
handle._set_done()
return handle
def daemon_threads_allowed():
"""Dummy implementation of _thread.daemon_threads_allowed()."""
return True
def _shutdown():
"""Dummy implementation of _thread._shutdown()."""
pass
def _make_thread_handle(ident):
"""Dummy implementation of _thread._make_thread_handle()."""
handle = _ThreadHandle()
handle._ident = ident
return handle
def _get_main_thread_ident():
"""Dummy implementation of _thread._get_main_thread_ident()."""
return _MAIN_THREAD_IDENT
def _is_main_interpreter():
"""Dummy implementation of _thread._is_main_interpreter()."""
return True
def exit():
"""Dummy implementation of _thread.exit()."""
raise SystemExit
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return _MAIN_THREAD_IDENT
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def stack_size(size=None):
"""Dummy implementation of _thread.stack_size()."""
if size is not None:
raise error("setting thread stack size not supported")
return 0
def _set_sentinel():
"""Dummy implementation of _thread._set_sentinel()."""
return LockType()
def _count():
"""Dummy implementation of _thread._count()."""
return 0
class LockType(object):
"""Class implementing dummy implementation of _thread.LockType.
Compatibility is maintained by maintaining self.locked_status
which is a boolean that stores the state of the lock. Pickling of
the lock, though, should not be done since if the _thread module is
then used with an unpickled ``lock()`` from here problems could
occur from this class not having atomic methods.
"""
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
True and returned appropriately based on value of
``waitflag``. If it is non-blocking, then the value is
actually checked and not set if it is already acquired. This
is all done so that threading.Condition's assert statements
aren't triggered and throw a little fit.
"""
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
"""Release the dummy lock."""
# XXX Perhaps shouldn't actually bother to test? Could lead
# to problems for complex, threaded code.
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
def _at_fork_reinit(self):
self.locked_status = False
def __repr__(self):
return "<%s %s.%s object at %s>" % (
"locked" if self.locked_status else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
hex(id(self)),
)
class _ThreadHandle:
"""Dummy implementation of _thread._ThreadHandle."""
def __init__(self):
self._ident = _MAIN_THREAD_IDENT
self._done = False
@property
def ident(self):
return self._ident
def _set_done(self):
self._done = True
def is_done(self):
return self._done
def join(self, timeout=None):
# In dummy thread, thread is always done
return
def __repr__(self):
return f"<_ThreadHandle ident={self._ident}>"
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
_main = True
def interrupt_main():
"""Set _interrupt flag to True to have start_new_thread raise
KeyboardInterrupt upon exiting."""
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True
class RLock:
def __init__(self):
self.locked_count = 0
def acquire(self, waitflag=None, timeout=-1):
self.locked_count += 1
return True
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
if not self.locked_count:
raise error
self.locked_count -= 1
return True
def locked(self):
return self.locked_count != 0
def __repr__(self):
return "<%s %s.%s object owner=%s count=%s at %s>" % (
"locked" if self.locked_count else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
get_ident() if self.locked_count else 0,
self.locked_count,
hex(id(self)),
)
class _local:
"""Dummy implementation of _thread._local (thread-local storage)."""
def __init__(self):
object.__setattr__(self, "_local__impl", {})
def __getattribute__(self, name):
if name.startswith("_local__"):
return object.__getattribute__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
return impl[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
if name.startswith("_local__"):
return object.__setattr__(self, name, value)
impl = object.__getattribute__(self, "_local__impl")
impl[name] = value
def __delattr__(self, name):
if name.startswith("_local__"):
return object.__delattr__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
del impl[name]
except KeyError:
raise AttributeError(name)