@@ -168,6 +168,7 @@ def _one(seq):
168
168
return x
169
169
170
170
171
+ @attrs .define (frozen = False )
171
172
class ThreadLocalInterpreter :
172
173
"""An interpeter used to execute a sequence of queries within the same thread and cursor.
173
174
@@ -177,11 +178,6 @@ class ThreadLocalInterpreter:
177
178
compiler : Compiler
178
179
gen : Generator
179
180
180
- def __init__ (self , compiler : Compiler , gen : Generator ):
181
- super ().__init__ ()
182
- self .gen = gen
183
- self .compiler = compiler
184
-
185
181
def apply_queries (self , callback : Callable [[str ], Any ]):
186
182
q : Expr = next (self .gen )
187
183
while True :
@@ -205,6 +201,7 @@ def apply_query(callback: Callable[[str], Any], sql_code: Union[str, ThreadLocal
205
201
return callback (sql_code )
206
202
207
203
204
+ @attrs .define (frozen = False )
208
205
class Mixin_Schema (AbstractMixin_Schema ):
209
206
def table_information (self ) -> Compilable :
210
207
return table ("information_schema" , "tables" )
@@ -221,6 +218,7 @@ def list_tables(self, table_schema: str, like: Compilable = None) -> Compilable:
221
218
)
222
219
223
220
221
+ @attrs .define (frozen = False )
224
222
class Mixin_RandomSample (AbstractMixin_RandomSample ):
225
223
def random_sample_n (self , tbl : ITable , size : int ) -> ITable :
226
224
# TODO use a more efficient algorithm, when the table count is known
@@ -230,15 +228,17 @@ def random_sample_ratio_approx(self, tbl: ITable, ratio: float) -> ITable:
230
228
return tbl .where (Random () < ratio )
231
229
232
230
231
+ @attrs .define (frozen = False )
233
232
class Mixin_OptimizerHints (AbstractMixin_OptimizerHints ):
234
233
def optimizer_hints (self , hints : str ) -> str :
235
234
return f"/*+ { hints } */ "
236
235
237
236
237
+ @attrs .define (frozen = False )
238
238
class BaseDialect (abc .ABC ):
239
239
SUPPORTS_PRIMARY_KEY : ClassVar [bool ] = False
240
240
SUPPORTS_INDEXES : ClassVar [bool ] = False
241
- TYPE_CLASSES : ClassVar [Dict [str , type ]] = {}
241
+ TYPE_CLASSES : ClassVar [Dict [str , Type [ ColType ] ]] = {}
242
242
243
243
PLACEHOLDER_TABLE = None # Used for Oracle
244
244
@@ -539,7 +539,7 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str:
539
539
540
540
def render_join (self , parent_c : Compiler , elem : Join ) -> str :
541
541
tables = [
542
- t if isinstance (t , TableAlias ) else TableAlias (source_table = t , name = parent_c .new_unique_name ())
542
+ t if isinstance (t , TableAlias ) else TableAlias (t , name = parent_c .new_unique_name ())
543
543
for t in elem .source_tables
544
544
]
545
545
c = parent_c .add_table_context (* tables , in_join = True , in_select = False )
@@ -827,6 +827,7 @@ def __getitem__(self, i):
827
827
return self .rows [i ]
828
828
829
829
830
+ @attrs .define (frozen = False , kw_only = True )
830
831
class Database (abc .ABC ):
831
832
"""Base abstract class for databases.
832
833
@@ -1102,22 +1103,22 @@ def is_autocommit(self) -> bool:
1102
1103
"Return whether the database autocommits changes. When false, COMMIT statements are skipped."
1103
1104
1104
1105
1106
+ @attrs .define (frozen = False )
1105
1107
class ThreadedDatabase (Database ):
1106
1108
"""Access the database through singleton threads.
1107
1109
1108
1110
Used for database connectors that do not support sharing their connection between different threads.
1109
1111
"""
1110
1112
1111
- _init_error : Optional [Exception ]
1112
- _queue : ThreadPoolExecutor
1113
- thread_local : threading .local
1113
+ thread_count : int = 1
1114
+
1115
+ _init_error : Optional [Exception ] = None
1116
+ _queue : Optional [ThreadPoolExecutor ] = None
1117
+ thread_local : threading .local = attrs .field (factory = threading .local )
1114
1118
1115
- def __init__ (self , thread_count = 1 ):
1116
- super ().__init__ ()
1117
- self ._init_error = None
1118
- self ._queue = ThreadPoolExecutor (thread_count , initializer = self .set_conn )
1119
- self .thread_local = threading .local ()
1120
- logger .info (f"[{ self .name } ] Starting a threadpool, size={ thread_count } ." )
1119
+ def __attrs_post_init__ (self ):
1120
+ self ._queue = ThreadPoolExecutor (self .thread_count , initializer = self .set_conn )
1121
+ logger .info (f"[{ self .name } ] Starting a threadpool, size={ self .thread_count } ." )
1121
1122
1122
1123
def set_conn (self ):
1123
1124
assert not hasattr (self .thread_local , "conn" )
0 commit comments