@@ -168,6 +168,7 @@ def _one(seq):
168
168
return x
169
169
170
170
171
+ @attrs .define (frozen = False )
171
172
class ThreadLocalInterpreter :
172
173
"""An interpeter used to execute a sequence of queries within the same thread and cursor.
173
174
@@ -177,11 +178,6 @@ class ThreadLocalInterpreter:
177
178
compiler : Compiler
178
179
gen : Generator
179
180
180
- def __init__ (self , compiler : Compiler , gen : Generator ):
181
- super ().__init__ ()
182
- self .gen = gen
183
- self .compiler = compiler
184
-
185
181
def apply_queries (self , callback : Callable [[str ], Any ]):
186
182
q : Expr = next (self .gen )
187
183
while True :
@@ -205,6 +201,7 @@ def apply_query(callback: Callable[[str], Any], sql_code: Union[str, ThreadLocal
205
201
return callback (sql_code )
206
202
207
203
204
+ @attrs .define (frozen = False )
208
205
class Mixin_Schema (AbstractMixin_Schema ):
209
206
def table_information (self ) -> Compilable :
210
207
return table ("information_schema" , "tables" )
@@ -221,6 +218,7 @@ def list_tables(self, table_schema: str, like: Compilable = None) -> Compilable:
221
218
)
222
219
223
220
221
+ @attrs .define (frozen = False )
224
222
class Mixin_RandomSample (AbstractMixin_RandomSample ):
225
223
def random_sample_n (self , tbl : ITable , size : int ) -> ITable :
226
224
# TODO use a more efficient algorithm, when the table count is known
@@ -230,15 +228,17 @@ def random_sample_ratio_approx(self, tbl: ITable, ratio: float) -> ITable:
230
228
return tbl .where (Random () < ratio )
231
229
232
230
231
+ @attrs .define (frozen = False )
233
232
class Mixin_OptimizerHints (AbstractMixin_OptimizerHints ):
234
233
def optimizer_hints (self , hints : str ) -> str :
235
234
return f"/*+ { hints } */ "
236
235
237
236
237
+ @attrs .define (frozen = False )
238
238
class BaseDialect (abc .ABC ):
239
239
SUPPORTS_PRIMARY_KEY : ClassVar [bool ] = False
240
240
SUPPORTS_INDEXES : ClassVar [bool ] = False
241
- TYPE_CLASSES : ClassVar [Dict [str , type ]] = {}
241
+ TYPE_CLASSES : ClassVar [Dict [str , Type [ ColType ] ]] = {}
242
242
MIXINS = frozenset ()
243
243
244
244
PLACEHOLDER_TABLE = None # Used for Oracle
@@ -540,7 +540,7 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str:
540
540
541
541
def render_join (self , parent_c : Compiler , elem : Join ) -> str :
542
542
tables = [
543
- t if isinstance (t , TableAlias ) else TableAlias (source_table = t , name = parent_c .new_unique_name ())
543
+ t if isinstance (t , TableAlias ) else TableAlias (t , name = parent_c .new_unique_name ())
544
544
for t in elem .source_tables
545
545
]
546
546
c = parent_c .add_table_context (* tables , in_join = True , in_select = False )
@@ -839,6 +839,7 @@ def __getitem__(self, i):
839
839
return self .rows [i ]
840
840
841
841
842
+ @attrs .define (frozen = False )
842
843
class Database (abc .ABC ):
843
844
"""Base abstract class for databases.
844
845
@@ -1114,22 +1115,22 @@ def is_autocommit(self) -> bool:
1114
1115
"Return whether the database autocommits changes. When false, COMMIT statements are skipped."
1115
1116
1116
1117
1118
+ @attrs .define (frozen = False )
1117
1119
class ThreadedDatabase (Database ):
1118
1120
"""Access the database through singleton threads.
1119
1121
1120
1122
Used for database connectors that do not support sharing their connection between different threads.
1121
1123
"""
1122
1124
1123
- _init_error : Optional [Exception ]
1124
- _queue : ThreadPoolExecutor
1125
- thread_local : threading .local
1125
+ thread_count : int = 1
1126
+
1127
+ _init_error : Optional [Exception ] = None
1128
+ _queue : Optional [ThreadPoolExecutor ] = None
1129
+ thread_local : threading .local = attrs .field (factory = threading .local )
1126
1130
1127
- def __init__ (self , thread_count = 1 ):
1128
- super ().__init__ ()
1129
- self ._init_error = None
1130
- self ._queue = ThreadPoolExecutor (thread_count , initializer = self .set_conn )
1131
- self .thread_local = threading .local ()
1132
- logger .info (f"[{ self .name } ] Starting a threadpool, size={ thread_count } ." )
1131
+ def __attrs_post_init__ (self ):
1132
+ self ._queue = ThreadPoolExecutor (self .thread_count , initializer = self .set_conn )
1133
+ logger .info (f"[{ self .name } ] Starting a threadpool, size={ self .thread_count } ." )
1133
1134
1134
1135
def set_conn (self ):
1135
1136
assert not hasattr (self .thread_local , "conn" )
0 commit comments