-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathconnection.py
2395 lines (2078 loc) · 89.8 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -----------------------------------------------------------------------------
# Copyright (c) 2020, 2025, Oracle and/or its affiliates.
#
# This software is dual-licensed to you under the Universal Permissive License
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
# either license.
#
# If you elect to accept the software under the Apache License, Version 2.0,
# the following applies:
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# connection.py
#
# Contains the Connection class and the factory method connect() used for
# establishing connections to the database.
#
# *** NOTICE *** This file is generated from a template and should not be
# modified directly. See build_from_template.py in the utils subdirectory for
# more information.
# -----------------------------------------------------------------------------
import collections
import functools
import ssl
from typing import Any, Callable, Type, Optional, Union
import oracledb
from . import __name__ as MODULE_NAME
from . import base_impl, constants, driver_mode, errors, thick_impl, thin_impl
from . import pool as pool_module
from .aq import AsyncQueue, Queue, MessageProperties
from .base_impl import DB_TYPE_BLOB, DB_TYPE_CLOB, DB_TYPE_NCLOB, DbType
from .connect_params import ConnectParams
from .cursor import AsyncCursor, Cursor
from .dbobject import DbObjectType, DbObject
from .lob import AsyncLOB, LOB
from .pipeline import Pipeline
from .soda import SodaDatabase
from .subscr import Subscription
# named tuple used for representing global transactions
Xid = collections.namedtuple(
"Xid", ["format_id", "global_transaction_id", "branch_qualifier"]
)
class BaseConnection:
__module__ = MODULE_NAME
_impl = None
def __init__(self):
self._version = None
def __repr__(self):
typ = self.__class__
cls_name = f"{typ.__module__}.{typ.__qualname__}"
if self._impl is None:
return f"<{cls_name} disconnected>"
elif self.username is None:
return f"<{cls_name} to externally identified user>"
return f"<{cls_name} to {self.username}@{self.dsn}>"
def _verify_connected(self) -> None:
"""
Verifies that the connection is connected to the database. If it is
not, an exception is raised.
"""
if self._impl is None:
errors._raise_err(errors.ERR_NOT_CONNECTED)
def _verify_xid(self, xid: Xid) -> None:
"""
Verifies that the supplied xid is of the correct type.
"""
if not isinstance(xid, Xid):
message = "expecting transaction id created with xid()"
raise TypeError(message)
@property
def action(self) -> None:
raise AttributeError("action is not readable")
@action.setter
def action(self, value: str) -> None:
"""
Specifies the action column in the v$session table. It is a string
attribute but the value None is also accepted and treated as an empty
string.
"""
self._verify_connected()
self._impl.set_action(value)
@property
def autocommit(self) -> bool:
"""
Specifies whether autocommit mode is on or off. When autocommit mode is
on, all statements are committed as soon as they have completed
executing successfully.
"""
self._verify_connected()
return self._impl.autocommit
@autocommit.setter
def autocommit(self, value: bool) -> None:
self._verify_connected()
self._impl.autocommit = value
@property
def call_timeout(self) -> int:
"""
Specifies the amount of time (in milliseconds) that a single round-trip
to the database may take before a timeout will occur. A value of 0
means that no timeout will take place.
"""
self._verify_connected()
return self._impl.get_call_timeout()
@call_timeout.setter
def call_timeout(self, value: int) -> None:
self._verify_connected()
self._impl.set_call_timeout(value)
def cancel(self) -> None:
"""
Break a long-running transaction.
"""
self._verify_connected()
self._impl.cancel()
@property
def client_identifier(self) -> None:
raise AttributeError("client_identifier is not readable")
@client_identifier.setter
def client_identifier(self, value: str) -> None:
"""
Specifies the client_identifier column in the v$session table.
"""
self._verify_connected()
self._impl.set_client_identifier(value)
@property
def clientinfo(self) -> None:
raise AttributeError("clientinfo is not readable")
@clientinfo.setter
def clientinfo(self, value: str) -> None:
"""
Specifies the client_info column in the v$session table.
"""
self._verify_connected()
self._impl.set_client_info(value)
@property
def current_schema(self) -> str:
"""
Specifies the current schema for the session. Setting this value is the
same as executing the SQL statement "ALTER SESSION SET CURRENT_SCHEMA".
The attribute is set (and verified) on the next call that does a round
trip to the server. The value is placed before unqualified database
objects in SQL statements you then execute.
"""
self._verify_connected()
return self._impl.get_current_schema()
@current_schema.setter
def current_schema(self, value: str) -> None:
self._verify_connected()
self._impl.set_current_schema(value)
@property
def dbop(self) -> None:
raise AttributeError("dbop is not readable")
@dbop.setter
def dbop(self, value: str) -> None:
"""
Specifies the database operation that is to be monitored. This can be
viewed in the DBOP_NAME column of the V$SQL_MONITOR table.
"""
self._verify_connected()
self._impl.set_dbop(value)
@property
def dsn(self) -> str:
"""
Specifies the connection string (TNS entry) of the database to which a
connection has been established.
"""
self._verify_connected()
return self._impl.dsn
@property
def econtext_id(self) -> None:
raise AttributeError("econtext_id is not readable")
@econtext_id.setter
def econtext_id(self, value: str) -> None:
"""
Specifies the execution context id. This value can be found as ecid in
the v$session table and econtext_id in the auditing tables. The maximum
length is 64 bytes.
"""
self._verify_connected()
self._impl.set_econtext_id(value)
@property
def db_domain(self) -> str:
"""
Specifies the name of the database domain.
"""
self._verify_connected()
return self._impl.get_db_domain()
@property
def db_name(self) -> str:
"""
Specifies the name of the database.
"""
self._verify_connected()
return self._impl.get_db_name()
@property
def session_id(self) -> int:
"""
Specifies the session identifier.
"""
self._verify_connected()
return self._impl.get_session_id()
@property
def serial_num(self) -> int:
"""
Specifies the session serial number.
"""
self._verify_connected()
return self._impl.get_serial_num()
@property
def edition(self) -> str:
"""
Specifies the session edition.
"""
self._verify_connected()
return self._impl.get_edition()
@property
def external_name(self) -> str:
"""
Specifies the external name that is used by the connection when logging
distributed transactions.
"""
self._verify_connected()
return self._impl.get_external_name()
@external_name.setter
def external_name(self, value: str) -> None:
self._verify_connected()
self._impl.set_external_name(value)
@property
def inputtypehandler(self) -> Callable:
"""
Specifies a method called for each value that is bound to a statement
executed on any cursor associated with this connection. The method
signature is handler(cursor, value, arraysize) and the return value is
expected to be a variable object or None in which case a default
variable object will be created. If this attribute is None, the default
behavior will take place for all values bound to statements.
"""
self._verify_connected()
return self._impl.inputtypehandler
@inputtypehandler.setter
def inputtypehandler(self, value: Callable) -> None:
self._verify_connected()
self._impl.inputtypehandler = value
@property
def instance_name(self) -> str:
"""
Returns the instance name associated with the connection. This is the
equivalent of the SQL expression:
sys_context('userenv', 'instance_name')
"""
self._verify_connected()
return self._impl.get_instance_name()
@property
def internal_name(self) -> str:
"""
Specifies the internal name that is used by the connection when logging
distributed transactions.
"""
self._verify_connected()
return self._impl.get_internal_name()
@internal_name.setter
def internal_name(self, value: str) -> None:
self._verify_connected()
self._impl.set_internal_name(value)
def is_healthy(self) -> bool:
"""
Returns a boolean indicating the health status of a connection.
Connections may become unusable in several cases, such as if the
network socket is broken, if an Oracle error indicates the connection
is unusable, or after receiving a planned down notification from the
database.
This function is best used before starting a new database request on an
existing standalone connection. Pooled connections internally perform
this check before returning a connection to the application.
If this function returns False, the connection should be not be used by
the application and a new connection should be established instead.
This function performs a local check. To fully check a connection's
health, use ping() which performs a round-trip to the database.
"""
return self._impl is not None and self._impl.get_is_healthy()
@property
def ltxid(self) -> bytes:
"""
Returns the logical transaction id for the connection. It is used
within Oracle Transaction Guard as a means of ensuring that
transactions are not duplicated. See the Oracle documentation and the
provided sample for more information.
"""
self._verify_connected()
return self._impl.get_ltxid()
@property
def max_identifier_length(self) -> int:
"""
Returns the maximum length of identifiers supported by the database to
which this connection has been established.
"""
self._verify_connected()
return self._impl.get_max_identifier_length()
@property
def max_open_cursors(self) -> int:
"""
Specifies the maximum number of cursors that the database can have open
concurrently.
"""
self._verify_connected()
return self._impl.get_max_open_cursors()
@property
def module(self) -> None:
raise AttributeError("module is not readable")
@module.setter
def module(self, value: str) -> None:
"""
Specifies the module column in the v$session table. The maximum length
for this string is 48 and if you exceed this length you will get
ORA-24960.
"""
self._verify_connected()
self._impl.set_module(value)
def msgproperties(
self,
payload: Optional[Union[bytes, str, DbObject]] = None,
correlation: Optional[str] = None,
delay: Optional[int] = None,
exceptionq: Optional[str] = None,
expiration: Optional[int] = None,
priority: Optional[int] = None,
recipients: Optional[list] = None,
) -> MessageProperties:
"""
Create and return a message properties object. If the parameters are
not None, they act as a shortcut for setting each of the equivalently
named properties.
"""
impl = self._impl.create_msg_props_impl()
props = MessageProperties._from_impl(impl)
if payload is not None:
props.payload = payload
if correlation is not None:
props.correlation = correlation
if delay is not None:
props.delay = delay
if exceptionq is not None:
props.exceptionq = exceptionq
if expiration is not None:
props.expiration = expiration
if priority is not None:
props.priority = priority
if recipients is not None:
props.recipients = recipients
return props
def queue(
self,
name: str,
payload_type: Optional[Union[DbObjectType, str]] = None,
*,
payloadType: Optional[DbObjectType] = None,
) -> Queue:
"""
Creates and returns a queue which is used to enqueue and dequeue
messages in Advanced Queueing (AQ).
The name parameter is expected to be a string identifying the queue in
which messages are to be enqueued or dequeued.
The payload_type parameter, if specified, is expected to be an
object type that identifies the type of payload the queue expects.
If the string "JSON" is specified, JSON data is enqueued and dequeued.
If not specified, RAW data is enqueued and dequeued.
"""
self._verify_connected()
payload_type_impl = None
is_json = False
if payloadType is not None:
if payload_type is not None:
errors._raise_err(
errors.ERR_DUPLICATED_PARAMETER,
deprecated_name="payloadType",
new_name="payload_type",
)
payload_type = payloadType
if payload_type is not None:
if payload_type == "JSON":
is_json = True
elif not isinstance(payload_type, DbObjectType):
raise TypeError("expecting DbObjectType")
else:
payload_type_impl = payload_type._impl
impl = self._impl.create_queue_impl()
impl.initialize(self._impl, name, payload_type_impl, is_json)
return self._create_queue(impl)
@property
def outputtypehandler(self) -> Callable:
"""
Specifies a method called for each column that is going to be fetched
from any cursor associated with this connection. The method signature
is handler(cursor, name, defaultType, length, precision, scale) and the
return value is expected to be a variable object or None in which case
a default variable object will be created. If this attribute is None,
the default behavior will take place for all columns fetched from
cursors associated with this connection.
"""
self._verify_connected()
return self._impl.outputtypehandler
@outputtypehandler.setter
def outputtypehandler(self, value: Callable) -> None:
self._verify_connected()
self._impl.outputtypehandler = value
@property
def sdu(self) -> int:
"""
Specifies the size of the Session Data Unit (SDU) that is being used by
the connection.
"""
self._verify_connected()
return self._impl.get_sdu()
@property
def service_name(self) -> str:
"""
Specifies the name of the service that was used to connect to the
database.
"""
self._verify_connected()
return self._impl.get_service_name()
@property
def stmtcachesize(self) -> int:
"""
Specifies the size of the statement cache. This value can make a
significant difference in performance (up to 100x) if you have a small
number of statements that you execute repeatedly.
"""
self._verify_connected()
return self._impl.get_stmt_cache_size()
@stmtcachesize.setter
def stmtcachesize(self, value: int) -> None:
self._verify_connected()
self._impl.set_stmt_cache_size(value)
@property
def thin(self) -> bool:
"""
Returns a boolean indicating if the connection was established in
python-oracledb's thin mode (True) or thick mode (False).
"""
self._verify_connected()
return self._impl.thin
@property
def transaction_in_progress(self) -> bool:
"""
Specifies whether a transaction is currently in progress on the
database using this connection.
"""
self._verify_connected()
return self._impl.get_transaction_in_progress()
@property
def username(self) -> str:
"""
Returns the name of the user which established the connection to the
database.
"""
self._verify_connected()
return self._impl.username
@property
def version(self) -> str:
"""
Returns the version of the database to which the connection has been
established.
"""
if self._version is None:
self._verify_connected()
self._version = ".".join(str(c) for c in self._impl.server_version)
return self._version
@property
def warning(self) -> errors._Error:
"""
Returns any warning that was generated when the connection was created,
or the value None if no warning was generated. The value will be
cleared for pooled connections after they are returned to the pool.
"""
self._verify_connected()
return self._impl.warning
def xid(
self,
format_id: int,
global_transaction_id: Union[bytes, str],
branch_qualifier: Union[bytes, str],
) -> Xid:
"""
Returns a global transaction identifier that can be used with the TPC
(two-phase commit) functions.
The format_id parameter should be a non-negative 32-bit integer. The
global_transaction_id and branch_qualifier parameters should be bytes
(or a string which will be UTF-8 encoded to bytes) of no more than 64
bytes.
"""
return Xid(format_id, global_transaction_id, branch_qualifier)
class Connection(BaseConnection):
__module__ = MODULE_NAME
def __init__(
self,
dsn: Optional[str] = None,
*,
pool: Optional["pool_module.ConnectionPool"] = None,
params: Optional[ConnectParams] = None,
**kwargs,
) -> None:
"""
Constructor for creating a connection to the database.
The dsn parameter (data source name) can be a string in the format
user/password@connect_string or can simply be the connect string (in
which case authentication credentials such as the username and password
need to be specified separately). See the documentation on connection
strings for more information.
The pool parameter is expected to be a pool object and the use of this
parameter is the equivalent of calling acquire() on the pool.
The params parameter is expected to be of type ConnectParams and
contains connection parameters that will be used when establishing the
connection. See the documentation on ConnectParams for more
information. If this parameter is not specified, the additional keyword
parameters will be used to create an instance of ConnectParams. If both
the params parameter and additional keyword parameters are specified,
the values in the keyword parameters have precedence. Note that if a
dsn is also supplied, then in the python-oracledb Thin mode, the values
of the parameters specified (if any) within the dsn will override the
values passed as additional keyword parameters, which themselves
override the values set in the params parameter object.
"""
super().__init__()
# determine if thin mode is being used
with driver_mode.get_manager() as mode_mgr:
thin = mode_mgr.thin
# determine which connection parameters to use
if params is None:
params_impl = base_impl.ConnectParamsImpl()
elif not isinstance(params, ConnectParams):
errors._raise_err(errors.ERR_INVALID_CONNECT_PARAMS)
else:
params_impl = params._impl.copy()
dsn = params_impl.process_args(dsn, kwargs, thin)
# see if connection is being acquired from a pool
if pool is None:
pool_impl = None
else:
pool._verify_open()
pool_impl = pool._impl
# create thin or thick implementation object
if thin:
if (
params_impl.shardingkey is not None
or params_impl.supershardingkey is not None
):
errors._raise_err(
errors.ERR_FEATURE_NOT_SUPPORTED,
feature="sharding",
driver_type="thick",
)
if pool is not None:
impl = pool_impl.acquire(params_impl)
else:
impl = thin_impl.ThinConnImpl(dsn, params_impl)
impl.connect(params_impl)
else:
impl = thick_impl.ThickConnImpl(dsn, params_impl)
impl.connect(params_impl, pool_impl)
self._impl = impl
# invoke callback, if applicable
if (
impl.invoke_session_callback
and pool is not None
and pool.session_callback is not None
and callable(pool.session_callback)
):
pool.session_callback(self, params_impl.tag)
impl.invoke_session_callback = False
def __del__(self):
if self._impl is not None:
self._impl.close(in_del=True)
self._impl = None
def __enter__(self):
self._verify_connected()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if self._impl is not None:
self._impl.close(in_del=True)
self._impl = None
def _create_queue(self, impl):
"""
Returns a queue object that the user can use to dequeue and enqueue
messages.
"""
return Queue._from_impl(self, impl)
def _get_oci_attr(
self, handle_type: int, attr_num: int, attr_type: int
) -> Any:
"""
Returns the value of the specified OCI attribute from the internal
handle. This is only supported in python-oracledb thick mode and should
only be used as directed by Oracle.
"""
self._verify_connected()
return self._impl._get_oci_attr(handle_type, attr_num, attr_type)
def _set_oci_attr(
self, handle_type: int, attr_num: int, attr_type: int, value: Any
) -> None:
"""
Sets the value of the specified OCI attribute on the internal handle.
This is only supported in python-oracledb thick mode and should only
be used as directed by Oracle.
"""
self._verify_connected()
self._impl._set_oci_attr(handle_type, attr_num, attr_type, value)
def begin(
self,
format_id: int = -1,
transaction_id: str = "",
branch_id: str = "",
) -> None:
"""
Deprecated. Use tpc_begin() instead.
"""
if format_id != -1:
self.tpc_begin(self.xid(format_id, transaction_id, branch_id))
@property
def callTimeout(self) -> int:
"""
Deprecated. Use property call_timeout instead.
"""
return self.call_timeout
@callTimeout.setter
def callTimeout(self, value: int) -> None:
self._verify_connected()
self._impl.set_call_timeout(value)
def changepassword(self, old_password: str, new_password: str) -> None:
"""
Changes the password for the user to which the connection is connected.
"""
self._verify_connected()
self._impl.change_password(old_password, new_password)
def close(self) -> None:
"""
Closes the connection and makes it unusable for further operations. An
Error exception will be raised if any operation is attempted with this
connection after this method completes successfully.
"""
self._verify_connected()
self._impl.close()
self._impl = None
def commit(self) -> None:
"""
Commits any pending transactions to the database.
"""
self._verify_connected()
self._impl.commit()
def createlob(
self, lob_type: DbType, data: Optional[Union[str, bytes]] = None
) -> LOB:
"""
Create and return a new temporary LOB of the specified type.
"""
self._verify_connected()
if lob_type not in (DB_TYPE_CLOB, DB_TYPE_NCLOB, DB_TYPE_BLOB):
message = (
"parameter should be one of oracledb.DB_TYPE_CLOB, "
"oracledb.DB_TYPE_BLOB or oracledb.DB_TYPE_NCLOB"
)
raise TypeError(message)
impl = self._impl.create_temp_lob_impl(lob_type)
lob = LOB._from_impl(impl)
if data:
lob.write(data)
return lob
def cursor(self, scrollable: bool = False) -> Cursor:
"""
Returns a cursor associated with the connection.
"""
self._verify_connected()
return Cursor(self, scrollable)
def decode_oson(self, data):
"""
Decode OSON-encoded bytes and return the object encoded in those bytes.
"""
self._verify_connected()
return self._impl.decode_oson(data)
def encode_oson(self, value):
"""
Return OSON-encoded bytes encoded from the supplied object.
"""
self._verify_connected()
return self._impl.encode_oson(value)
def fetch_df_all(
self,
statement: str,
parameters: Optional[Union[list, tuple, dict]] = None,
arraysize: Optional[int] = None,
):
"""
Fetch all data as OracleDataFrame.
"""
cursor = self.cursor()
cursor._impl.fetching_arrow = True
if arraysize is not None:
cursor.arraysize = arraysize
cursor.prefetchrows = cursor.arraysize
cursor.execute(statement, parameters)
return cursor._impl.fetch_df_all(cursor)
def fetch_df_batches(
self,
statement: str,
parameters: Optional[Union[list, tuple, dict]] = None,
size: Optional[int] = None,
):
"""
Fetch data in batches. Each batch is an OracleDataFrame
"""
cursor = self.cursor()
cursor._impl.fetching_arrow = True
if size is not None:
cursor.arraysize = size
cursor.prefetchrows = cursor.arraysize
cursor.execute(statement, parameters)
if size is None:
yield cursor._impl.fetch_df_all(cursor)
else:
yield from cursor._impl.fetch_df_batches(cursor, batch_size=size)
def getSodaDatabase(self) -> SodaDatabase:
"""
Return a SODA database object for performing all operations on Simple
Oracle Document Access (SODA).
"""
self._verify_connected()
db_impl = self._impl.create_soda_database_impl(self)
return SodaDatabase._from_impl(self, db_impl)
def gettype(self, name: str) -> DbObjectType:
"""
Return a type object given its name. This can then be used to create
objects which can be bound to cursors created by this connection.
"""
self._verify_connected()
obj_type_impl = self._impl.get_type(self, name)
return DbObjectType._from_impl(obj_type_impl)
@property
def handle(self) -> int:
"""
Returns the OCI service context handle for the connection. It is
primarily provided to facilitate testing the creation of a connection
using the OCI service context handle.
This property is only relevant to python-oracledb's thick mode.
"""
self._verify_connected()
return self._impl.get_handle()
@property
def maxBytesPerCharacter(self) -> int:
"""
Deprecated. Use the constant value 4 instead.
"""
return 4
def ping(self) -> None:
"""
Pings the database to verify the connection is valid.
"""
self._verify_connected()
self._impl.ping()
def prepare(self) -> bool:
"""
Deprecated. Use tpc_prepare() instead.
"""
return self.tpc_prepare()
@property
def proxy_user(self) -> Union[str, None]:
"""
Returns the name of the proxy user, if applicable.
"""
self._verify_connected()
return self._impl.proxy_user
def rollback(self) -> None:
"""
Rolls back any pending transactions.
"""
self._verify_connected()
self._impl.rollback()
def shutdown(self, mode: int = 0) -> None:
"""
Shutdown the database. In order to do this the connection must be
connected as SYSDBA or SYSOPER. Two calls must be made unless the mode
specified is DBSHUTDOWN_ABORT.
"""
self._verify_connected()
self._impl.shutdown(mode)
def startup(
self,
force: bool = False,
restrict: bool = False,
pfile: Optional[str] = None,
) -> None:
"""
Startup the database. This is equivalent to the SQL*Plus command
“startup nomount”. The connection must be connected as SYSDBA or
SYSOPER with the PRELIM_AUTH option specified for this to work.
The pfile parameter, if specified, is expected to be a string
identifying the location of the parameter file (PFILE) which will be
used instead of the stored parameter file (SPFILE).
"""
self._verify_connected()
self._impl.startup(force, restrict, pfile)
def subscribe(
self,
namespace: int = constants.SUBSCR_NAMESPACE_DBCHANGE,
protocol: int = constants.SUBSCR_PROTO_CALLBACK,
callback: Optional[Callable] = None,
timeout: int = 0,
operations: int = constants.OPCODE_ALLOPS,
port: int = 0,
qos: int = constants.SUBSCR_QOS_DEFAULT,
ip_address: Optional[str] = None,
grouping_class: int = constants.SUBSCR_GROUPING_CLASS_NONE,
grouping_value: int = 0,
grouping_type: int = constants.SUBSCR_GROUPING_TYPE_SUMMARY,
name: Optional[str] = None,
client_initiated: bool = False,
*,
ipAddress: Optional[str] = None,
groupingClass: int = constants.SUBSCR_GROUPING_CLASS_NONE,
groupingValue: int = 0,
groupingType: int = constants.SUBSCR_GROUPING_TYPE_SUMMARY,
clientInitiated: bool = False,
) -> Subscription:
"""
Return a new subscription object that receives notification for events
that take place in the database that match the given parameters.
The namespace parameter specifies the namespace the subscription uses.
It can be one of SUBSCR_NAMESPACE_DBCHANGE or SUBSCR_NAMESPACE_AQ.
The protocol parameter specifies the protocol to use when notifications
are sent. Currently the only valid value is SUBSCR_PROTO_CALLBACK.
The callback is expected to be a callable that accepts a single
parameter. A message object is passed to this callback whenever a
notification is received.
The timeout value specifies that the subscription expires after the
given time in seconds. The default value of 0 indicates that the
subscription never expires.
The operations parameter enables filtering of the messages that are
sent (insert, update, delete). The default value will send
notifications for all operations. This parameter is only used when the
namespace is set to SUBSCR_NAMESPACE_DBCHANGE.
The port parameter specifies the listening port for callback
notifications from the database server. If not specified, an unused
port will be selected by the Oracle Client libraries.
The qos parameter specifies quality of service options. It should be
one or more of the following flags, OR'ed together:
SUBSCR_QOS_RELIABLE,
SUBSCR_QOS_DEREG_NFY,
SUBSCR_QOS_ROWIDS,
SUBSCR_QOS_QUERY,
SUBSCR_QOS_BEST_EFFORT.
The ip_address parameter specifies the IP address (IPv4 or IPv6) in
standard string notation to bind for callback notifications from the
database server. If not specified, the client IP address will be
determined by the Oracle Client libraries.
The grouping_class parameter specifies what type of grouping of
notifications should take place. Currently, if set, this value can
only be set to the value SUBSCR_GROUPING_CLASS_TIME, which will group
notifications by the number of seconds specified in the grouping_value
parameter. The grouping_type parameter should be one of the values
SUBSCR_GROUPING_TYPE_SUMMARY (the default) or
SUBSCR_GROUPING_TYPE_LAST.
The name parameter is used to identify the subscription and is specific
to the selected namespace. If the namespace parameter is
SUBSCR_NAMESPACE_DBCHANGE then the name is optional and can be any
value. If the namespace parameter is SUBSCR_NAMESPACE_AQ, however, the
name must be in the format '<QUEUE_NAME>' for single consumer queues
and '<QUEUE_NAME>:<CONSUMER_NAME>' for multiple consumer queues, and