@@ -944,6 +944,173 @@ def choose_lines(source, number, seed=None, generator=random):
944
944
"""
945
945
946
946
947
+ class ZlibDecompressorTest ():
948
+ # Test adopted from test_bz2.py
949
+ TEXT = HAMLET_SCENE
950
+ DATA = zlib .compress (HAMLET_SCENE )
951
+ BAD_DATA = b"Not a valid deflate block"
952
+ def test_Constructor (self ):
953
+ self .assertRaises (TypeError , zlib ._ZlibDecompressor , 42 )
954
+
955
+ def testDecompress (self ):
956
+ zlibd = zlib ._ZlibDecompressor ()
957
+ self .assertRaises (TypeError , zlibd .decompress )
958
+ text = zlibd .decompress (self .DATA )
959
+ self .assertEqual (text , self .TEXT )
960
+
961
+ def testDecompressChunks10 (self ):
962
+ zlibd = zlib ._ZlibDecompressor ()
963
+ text = b''
964
+ n = 0
965
+ while True :
966
+ str = self .DATA [n * 10 :(n + 1 )* 10 ]
967
+ if not str :
968
+ break
969
+ text += zlibd .decompress (str )
970
+ n += 1
971
+ self .assertEqual (text , self .TEXT )
972
+
973
+ def testDecompressUnusedData (self ):
974
+ zlibd = zlib ._ZlibDecompressor ()
975
+ unused_data = b"this is unused data"
976
+ text = zlibd .decompress (self .DATA + unused_data )
977
+ self .assertEqual (text , self .TEXT )
978
+ self .assertEqual (zlibd .unused_data , unused_data )
979
+
980
+ def testEOFError (self ):
981
+ zlibd = zlib ._ZlibDecompressor ()
982
+ text = zlibd .decompress (self .DATA )
983
+ self .assertRaises (EOFError , zlibd .decompress , b"anything" )
984
+ self .assertRaises (EOFError , zlibd .decompress , b"" )
985
+
986
+ @support .skip_if_pgo_task
987
+ @bigmemtest (size = _4G + 100 , memuse = 3.3 )
988
+ def testDecompress4G (self , size ):
989
+ # "Test zlib._ZlibDecompressor.decompress() with >4GiB input"
990
+ blocksize = 10 * 1024 * 1024
991
+ block = random .randbytes (blocksize )
992
+ try :
993
+ data = block * (size // blocksize + 1 )
994
+ compressed = zlib .compress (data )
995
+ zlibd = zlib ._ZlibDecompressor ()
996
+ decompressed = zlibd .decompress (compressed )
997
+ self .assertTrue (decompressed == data )
998
+ finally :
999
+ data = None
1000
+ compressed = None
1001
+ decompressed = None
1002
+
1003
+ def testPickle (self ):
1004
+ for proto in range (pickle .HIGHEST_PROTOCOL + 1 ):
1005
+ with self .assertRaises (TypeError ):
1006
+ pickle .dumps (zlib ._ZlibDecompressor (), proto )
1007
+
1008
+ def testDecompressorChunksMaxsize (self ):
1009
+ zlibd = zlib ._ZlibDecompressor ()
1010
+ max_length = 100
1011
+ out = []
1012
+
1013
+ # Feed some input
1014
+ len_ = len (self .BIG_DATA ) - 64
1015
+ out .append (zlibd .decompress (self .BIG_DATA [:len_ ],
1016
+ max_length = max_length ))
1017
+ self .assertFalse (zlibd .needs_input )
1018
+ self .assertEqual (len (out [- 1 ]), max_length )
1019
+
1020
+ # Retrieve more data without providing more input
1021
+ out .append (zlibd .decompress (b'' , max_length = max_length ))
1022
+ self .assertFalse (zlibd .needs_input )
1023
+ self .assertEqual (len (out [- 1 ]), max_length )
1024
+
1025
+ # Retrieve more data while providing more input
1026
+ out .append (zlibd .decompress (self .BIG_DATA [len_ :],
1027
+ max_length = max_length ))
1028
+ self .assertLessEqual (len (out [- 1 ]), max_length )
1029
+
1030
+ # Retrieve remaining uncompressed data
1031
+ while not zlibd .eof :
1032
+ out .append (zlibd .decompress (b'' , max_length = max_length ))
1033
+ self .assertLessEqual (len (out [- 1 ]), max_length )
1034
+
1035
+ out = b"" .join (out )
1036
+ self .assertEqual (out , self .BIG_TEXT )
1037
+ self .assertEqual (zlibd .unused_data , b"" )
1038
+
1039
+ def test_decompressor_inputbuf_1 (self ):
1040
+ # Test reusing input buffer after moving existing
1041
+ # contents to beginning
1042
+ zlibd = zlib ._ZlibDecompressor ()
1043
+ out = []
1044
+
1045
+ # Create input buffer and fill it
1046
+ self .assertEqual (zlibd .decompress (self .DATA [:100 ],
1047
+ max_length = 0 ), b'' )
1048
+
1049
+ # Retrieve some results, freeing capacity at beginning
1050
+ # of input buffer
1051
+ out .append (zlibd .decompress (b'' , 2 ))
1052
+
1053
+ # Add more data that fits into input buffer after
1054
+ # moving existing data to beginning
1055
+ out .append (zlibd .decompress (self .DATA [100 :105 ], 15 ))
1056
+
1057
+ # Decompress rest of data
1058
+ out .append (zlibd .decompress (self .DATA [105 :]))
1059
+ self .assertEqual (b'' .join (out ), self .TEXT )
1060
+
1061
+ def test_decompressor_inputbuf_2 (self ):
1062
+ # Test reusing input buffer by appending data at the
1063
+ # end right away
1064
+ zlibd = zlib ._ZlibDecompressor ()
1065
+ out = []
1066
+
1067
+ # Create input buffer and empty it
1068
+ self .assertEqual (zlibd .decompress (self .DATA [:200 ],
1069
+ max_length = 0 ), b'' )
1070
+ out .append (zlibd .decompress (b'' ))
1071
+
1072
+ # Fill buffer with new data
1073
+ out .append (zlibd .decompress (self .DATA [200 :280 ], 2 ))
1074
+
1075
+ # Append some more data, not enough to require resize
1076
+ out .append (zlibd .decompress (self .DATA [280 :300 ], 2 ))
1077
+
1078
+ # Decompress rest of data
1079
+ out .append (zlibd .decompress (self .DATA [300 :]))
1080
+ self .assertEqual (b'' .join (out ), self .TEXT )
1081
+
1082
+ def test_decompressor_inputbuf_3 (self ):
1083
+ # Test reusing input buffer after extending it
1084
+
1085
+ zlibd = zlib ._ZlibDecompressor ()
1086
+ out = []
1087
+
1088
+ # Create almost full input buffer
1089
+ out .append (zlibd .decompress (self .DATA [:200 ], 5 ))
1090
+
1091
+ # Add even more data to it, requiring resize
1092
+ out .append (zlibd .decompress (self .DATA [200 :300 ], 5 ))
1093
+
1094
+ # Decompress rest of data
1095
+ out .append (zlibd .decompress (self .DATA [300 :]))
1096
+ self .assertEqual (b'' .join (out ), self .TEXT )
1097
+
1098
+ def test_failure (self ):
1099
+ zlibd = zlib ._ZlibDecompressor ()
1100
+ self .assertRaises (Exception , zlibd .decompress , self .BAD_DATA * 30 )
1101
+ # Previously, a second call could crash due to internal inconsistency
1102
+ self .assertRaises (Exception , zlibd .decompress , self .BAD_DATA * 30 )
1103
+
1104
+ @support .refcount_test
1105
+ def test_refleaks_in___init__ (self ):
1106
+ gettotalrefcount = support .get_attribute (sys , 'gettotalrefcount' )
1107
+ zlibd = zlib ._ZlibDecompressor ()
1108
+ refs_before = gettotalrefcount ()
1109
+ for i in range (100 ):
1110
+ zlibd .__init__ ()
1111
+ self .assertAlmostEqual (gettotalrefcount () - refs_before , 0 , delta = 10 )
1112
+
1113
+
947
1114
class CustomInt :
948
1115
def __index__ (self ):
949
1116
return 100
0 commit comments