-
Notifications
You must be signed in to change notification settings - Fork 198
Expand file tree
/
Copy pathcharset.go
More file actions
164 lines (147 loc) · 4.95 KB
/
Copy pathcharset.go
File metadata and controls
164 lines (147 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package connmysql
import (
"context"
"fmt"
"log/slog"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/encoding/korean"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/encoding/unicode/utf32"
"golang.org/x/text/transform"
)
// charsetsNoTranscode lists the MySQL character sets whose stored bytes are
// already valid UTF-8 (or are opaque binary)
var charsetsNoTranscode = map[string]struct{}{
"utf8": {},
"utf8mb3": {},
"utf8mb4": {},
"ascii": {},
"binary": {},
}
// mysqlCharsetEncodings maps a MySQL character set name to the golang.org/x/text
var mysqlCharsetEncodings = map[string]encoding.Encoding{
// Single-byte / Windows & ISO code pages.
"latin1": charmap.Windows1252,
"latin2": charmap.ISO8859_2,
"latin5": charmap.Windows1254,
"latin7": charmap.ISO8859_13,
"cp1250": charmap.Windows1250,
"cp1251": charmap.Windows1251,
"cp1256": charmap.Windows1256,
"cp1257": charmap.Windows1257,
"cp850": charmap.CodePage850,
"cp852": charmap.CodePage852,
"cp866": charmap.CodePage866,
"koi8r": charmap.KOI8R,
"koi8u": charmap.KOI8U,
"greek": charmap.ISO8859_7,
"hebrew": charmap.ISO8859_8,
"tis620": charmap.Windows874,
"macroman": charmap.Macintosh,
// Multi-byte CJK code pages.
"gbk": simplifiedchinese.GBK,
"gb2312": simplifiedchinese.GBK, // GBK is a strict superset of GB2312/EUC-CN
"gb18030": simplifiedchinese.GB18030,
"big5": traditionalchinese.Big5,
"sjis": japanese.ShiftJIS,
"cp932": japanese.ShiftJIS, // cp932 is a near-superset of Shift-JIS
"ujis": japanese.EUCJP,
"eucjpms": japanese.EUCJP,
"euckr": korean.EUCKR,
// Wide Unicode encodings.
"utf16": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf16le": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"ucs2": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf32": utf32.UTF32(utf32.BigEndian, utf32.IgnoreBOM),
}
// collationEncoding resolves a MySQL collation id (as carried in binlog
// TABLE_MAP metadata) to the x/text encoding needed to convert that column's
// bytes to UTF-8.
func (c *MySqlConnector) collationEncoding(ctx context.Context, collationID uint64) (encoding.Encoding, error) {
if collationID == 0 {
return nil, nil
}
charset, err := c.charsetForCollation(ctx, collationID)
if err != nil {
return nil, err
}
if charset == "" {
c.warnCharsetOnce(fmt.Sprintf("collation:%d", collationID), func() {
c.logger.Warn("unknown MySQL collation id on CDC path, passing bytes through untranscoded",
slog.Uint64("collationID", collationID))
})
return nil, nil
}
if _, skip := charsetsNoTranscode[charset]; skip {
return nil, nil
}
if enc, ok := mysqlCharsetEncodings[charset]; ok {
return enc, nil
}
c.warnCharsetOnce(charset, func() {
c.logger.Warn("unsupported MySQL character set on CDC path, passing bytes through untranscoded",
slog.String("charset", charset), slog.Uint64("collationID", collationID))
})
return nil, nil
}
func (c *MySqlConnector) charsetForCollation(ctx context.Context, collationID uint64) (string, error) {
if m := c.collationCharset.Load(); m != nil {
return (*m)[collationID], nil
}
m, err := c.loadCollationCharsetMap(ctx)
if err != nil {
return "", err
}
c.collationCharset.Store(&m)
return m[collationID], nil
}
func (c *MySqlConnector) loadCollationCharsetMap(ctx context.Context) (map[uint64]string, error) {
rs, err := c.Execute(ctx, "SELECT ID, CHARACTER_SET_NAME FROM information_schema.COLLATIONS")
if err != nil {
return nil, fmt.Errorf("failed to load collation charset map: %w", err)
}
m := make(map[uint64]string, rs.RowNumber())
for idx := range rs.RowNumber() {
id, err := rs.GetInt(idx, 0)
if err != nil {
return nil, fmt.Errorf("failed to read collation id: %w", err)
}
charset, err := rs.GetString(idx, 1)
if err != nil {
return nil, fmt.Errorf("failed to read collation charset name: %w", err)
}
m[uint64(id)] = charset
}
return m, nil
}
func (c *MySqlConnector) warnCharsetOnce(key string, warn func()) {
if _, loaded := c.warnedCharsets.LoadOrStore(key, struct{}{}); !loaded {
warn()
}
}
// decodeMySQLBytes converts bytes stored in the column's character set to UTF-8.
func decodeMySQLBytes(enc encoding.Encoding, b []byte) (string, error) {
if enc == nil {
return string(b), nil
}
out, _, err := transform.Bytes(enc.NewDecoder(), b)
if err != nil {
return "", fmt.Errorf("failed to transcode column bytes to UTF-8: %w", err)
}
return string(out), nil
}
// decodeMySQLBytes converts string stored in the column's character set to UTF-8.
func decodeMySQLString(enc encoding.Encoding, s string) (string, error) {
if enc == nil {
return s, nil
}
out, _, err := transform.String(enc.NewDecoder(), s)
if err != nil {
return "", fmt.Errorf("failed to transcode column string to UTF-8: %w", err)
}
return out, nil
}