Skip to content
Merged
44 changes: 40 additions & 4 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/text/encoding"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors/utils"
Expand Down Expand Up @@ -596,6 +597,41 @@ func (c *MySqlConnector) PullRecords(
enumMap := ev.Table.EnumStrValueMap()
setMap := ev.Table.SetStrValueMap()

// Binlog row events carry string/ENUM/SET bytes verbatim in each column's
// own character set; unlike the snapshot path (SET NAMES utf8mb4) the server
// does no transcoding here. Resolve a decoder per column from the TABLE_MAP
// collation metadata so non-utf8 columns (e.g. latin1/gbk/sjis) reach the
// destination as valid UTF-8 instead of mojibake. nil = already UTF-8, no-op.
colEncodings := make(map[int]encoding.Encoding)
for colIdx, collationID := range ev.Table.CollationMap() {
enc, err := c.collationEncoding(ctx, collationID)
if err != nil {
return err
}
colEncodings[colIdx] = enc
}
Comment thread
Copilot marked this conversation as resolved.
Outdated
// ENUM/SET label values themselves are stored in the column's charset, so
// transcode the decoded label tables in place before they are emitted.
for colIdx, collationID := range ev.Table.EnumSetCollationMap() {
enc, err := c.collationEncoding(ctx, collationID)
if err != nil {
return err
}
if enc == nil {
continue
}
colEncodings[colIdx] = enc
for labels := range slices.Values([][]string{enumMap[colIdx], setMap[colIdx]}) {
for i, label := range labels {
decoded, err := decodeMySQLString(enc, label)
if err != nil {
return err
}
labels[i] = decoded
}
}
}

// Process TABLE_MAP_EVENT schema to detect new columns
var fields []*protos.FieldDescription
if ev.Table.ColumnName != nil {
Expand Down Expand Up @@ -635,7 +671,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -672,7 +708,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand All @@ -686,7 +722,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -724,7 +760,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down
182 changes: 182 additions & 0 deletions flow/connectors/mysql/charset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package connmysql

import (
"context"
"fmt"
"log/slog"

"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/encoding/korean"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/encoding/unicode/utf32"
"golang.org/x/text/transform"
)

// charsetsNoTranscode lists the MySQL character sets whose stored bytes are
// already valid UTF-8 (or are opaque binary) and therefore need no conversion
// when read off the binlog. Anything in this set maps to a nil encoding and
// keeps the original zero-cost string(val) fast path.
var charsetsNoTranscode = map[string]struct{}{
"utf8": {},
"utf8mb3": {},
"utf8mb4": {},
"ascii": {},
"binary": {},
}

// mysqlCharsetEncodings maps a MySQL character set name to the golang.org/x/text
// encoding used to decode its bytes into UTF-8.
//
// Caveat worth remembering: MySQL's "latin1" is Windows-1252, NOT ISO-8859-1 —
// it assigns printable characters to the 0x80-0x9F range that ISO-8859-1 leaves
// as C1 control codes. Decoding latin1 as ISO-8859-1 silently mangles smart
// quotes, the euro sign, etc., so we deliberately use charmap.Windows1252.
//
// MySQL's "utf16"/"ucs2" are big-endian; "utf16le" is little-endian; "utf32" is
// big-endian. We ignore any BOM since binlog column data carries none.
var mysqlCharsetEncodings = map[string]encoding.Encoding{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List known unsupported as well: armscii8, dec8, geostd8, hp8, keybcs2, macce, swe7 (all rare enough until we get a signal otherwise)

// Single-byte / Windows & ISO code pages.
"latin1": charmap.Windows1252,
"latin2": charmap.ISO8859_2,
"latin5": charmap.Windows1254,
Comment thread
dtunikov marked this conversation as resolved.
Outdated
"latin7": charmap.ISO8859_13,
"cp1250": charmap.Windows1250,
"cp1251": charmap.Windows1251,
"cp1256": charmap.Windows1256,
"cp1257": charmap.Windows1257,
"cp850": charmap.CodePage850,
"cp852": charmap.CodePage852,
"cp866": charmap.CodePage866,
"koi8r": charmap.KOI8R,
"koi8u": charmap.KOI8U,
"greek": charmap.ISO8859_7,
"hebrew": charmap.ISO8859_8,
"tis620": charmap.Windows874,
"macroman": charmap.Macintosh,

// Multi-byte CJK code pages.
"gbk": simplifiedchinese.GBK,
"gb2312": simplifiedchinese.GBK, // GBK is a strict superset of GB2312/EUC-CN
"gb18030": simplifiedchinese.GB18030,
"big5": traditionalchinese.Big5,
"sjis": japanese.ShiftJIS,
"cp932": japanese.ShiftJIS, // cp932 is a near-superset of Shift-JIS
"ujis": japanese.EUCJP,
"eucjpms": japanese.EUCJP,
"euckr": korean.EUCKR,

// Wide Unicode encodings.
"utf16": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf16le": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"ucs2": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf32": utf32.UTF32(utf32.BigEndian, utf32.IgnoreBOM),
}

// collationEncoding resolves a MySQL collation id (as carried in binlog
// TABLE_MAP metadata) to the x/text encoding needed to convert that column's
// bytes to UTF-8. It returns (nil, nil) when no transcoding is required, i.e.
// the column is already UTF-8/ascii/binary, the collation is unknown, or the
// charset is one we cannot transcode (in which case we warn once and fall back
// to passing the raw bytes through, matching pre-existing behavior).
func (c *MySqlConnector) collationEncoding(ctx context.Context, collationID uint64) (encoding.Encoding, error) {
if collationID == 0 {
return nil, nil
}

charset, err := c.charsetForCollation(ctx, collationID)
Comment thread
dtunikov marked this conversation as resolved.
if err != nil {
return nil, err
}
if charset == "" {
c.warnCharsetOnce(fmt.Sprintf("collation:%d", collationID), func() {
c.logger.Warn("unknown MySQL collation id on CDC path, passing bytes through untranscoded",
slog.Uint64("collationID", collationID))
})
return nil, nil
}

if _, skip := charsetsNoTranscode[charset]; skip {
return nil, nil
}
if enc, ok := mysqlCharsetEncodings[charset]; ok {
return enc, nil
}

c.warnCharsetOnce(charset, func() {
c.logger.Warn("unsupported MySQL character set on CDC path, passing bytes through untranscoded",
slog.String("charset", charset), slog.Uint64("collationID", collationID))
})
return nil, nil
}

// charsetForCollation maps a collation id to its character set name, lazily
// loading (and caching) the full table from information_schema on first use.
func (c *MySqlConnector) charsetForCollation(ctx context.Context, collationID uint64) (string, error) {
Comment thread
dtunikov marked this conversation as resolved.
if m := c.collationCharset.Load(); m != nil {
Comment thread
dtunikov marked this conversation as resolved.
return (*m)[collationID], nil
}

m, err := c.loadCollationCharsetMap(ctx)
if err != nil {
return "", err
}
c.collationCharset.Store(&m)
return m[collationID], nil
}

func (c *MySqlConnector) loadCollationCharsetMap(ctx context.Context) (map[uint64]string, error) {
rs, err := c.Execute(ctx, "SELECT ID, CHARACTER_SET_NAME FROM information_schema.COLLATIONS")
if err != nil {
return nil, fmt.Errorf("failed to load collation charset map: %w", err)
}

m := make(map[uint64]string, rs.RowNumber())
for idx := range rs.RowNumber() {
id, err := rs.GetInt(idx, 0)
if err != nil {
return nil, fmt.Errorf("failed to read collation id: %w", err)
}
charset, err := rs.GetString(idx, 1)
if err != nil {
return nil, fmt.Errorf("failed to read collation charset name: %w", err)
}
m[uint64(id)] = charset
}
return m, nil
}

func (c *MySqlConnector) warnCharsetOnce(key string, warn func()) {
if _, loaded := c.warnedCharsets.LoadOrStore(key, struct{}{}); !loaded {
warn()
}
}

// decodeMySQLBytes converts bytes stored in the column's character set to UTF-8.
// enc may be nil, meaning the bytes are already UTF-8/ascii/binary and are
// returned verbatim (the zero-overhead common case).
func decodeMySQLBytes(enc encoding.Encoding, b []byte) (string, error) {
if enc == nil {
return string(b), nil
}
out, _, err := transform.Bytes(enc.NewDecoder(), b)
if err != nil {
return "", fmt.Errorf("failed to transcode column bytes to UTF-8: %w", err)
}
return string(out), nil
}

// decodeMySQLString is the string-typed counterpart to decodeMySQLBytes.
func decodeMySQLString(enc encoding.Encoding, s string) (string, error) {
if enc == nil {
return s, nil
}
out, _, err := transform.String(enc.NewDecoder(), s)
if err != nil {
return "", fmt.Errorf("failed to transcode column string to UTF-8: %w", err)
}
return out, nil
}
64 changes: 64 additions & 0 deletions flow/connectors/mysql/charset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package connmysql

import (
"testing"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/shared/types"
)

func TestDecodeMySQLBytes(t *testing.T) {
for _, tc := range []struct {
name string
charset string
in []byte
want string
}{
// MySQL latin1 is Windows-1252: 0x80 is the euro sign, 0xE9 is é.
{"latin1 accented", "latin1", []byte{0x63, 0x61, 0x66, 0xE9}, "café"},
{"latin1 euro", "latin1", []byte{0x80}, "€"},
// gbk: 0xC4E3 0xBAC3 = 你好
{"gbk hello", "gbk", []byte{0xC4, 0xE3, 0xBA, 0xC3}, "你好"},
// sjis: 0x83 0x4F = グ
{"sjis kana", "sjis", []byte{0x83, 0x4F}, "グ"},
// euckr: 0xBEC8 0xB3E7 = 안녕
{"euckr hangul", "euckr", []byte{0xBE, 0xC8, 0xB3, 0xE7}, "안녕"},
// utf8mb4 passes through unchanged (nil encoding).
{"utf8mb4 passthrough", "utf8mb4", []byte("café"), "café"},
{"ascii passthrough", "ascii", []byte("plain"), "plain"},
} {
t.Run(tc.name, func(t *testing.T) {
enc := mysqlCharsetEncodings[tc.charset] // nil for utf8mb4/ascii -> passthrough
got, err := decodeMySQLBytes(enc, tc.in)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}

// TestQValueFromMysqlRowEvent_Transcodes verifies that the CDC decode path applies
// the column's charset decoder, so a latin1 column no longer reaches the destination
// as invalid-UTF-8 mojibake (the bug in DBI-810).
func TestQValueFromMysqlRowEvent_Transcodes(t *testing.T) {
ev := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_VARCHAR}}
logger := log.NewStructuredLogger(nil)
var coercionReported bool

latin1 := []byte{0x63, 0x61, 0x66, 0xE9} // "café" in latin1

// Without a decoder (utf8mb4 column) the bytes are reinterpreted verbatim - mojibake.
raw, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, nil, logger, &coercionReported)
require.NoError(t, err)
require.NotEqual(t, "café", raw.(types.QValueString).Val)

// With the latin1 decoder the value is correctly transcoded.
decoded, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, mysqlCharsetEncodings["latin1"], logger, &coercionReported)
require.NoError(t, err)
require.Equal(t, "café", decoded.(types.QValueString).Val)
}
7 changes: 7 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"log/slog"
"net"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -29,13 +30,19 @@

type MySqlConnector struct {
*metadataStore.PostgresMetadata
config *protos.MySqlConfig

Check failure on line 33 in flow/connectors/mysql/mysql.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gci)
ssh *utils.SSHTunnel
conn atomic.Pointer[client.Conn] // atomic used for internal concurrency, connector interface is not threadsafe
contexts atomic.Pointer[chan context.Context]
logger log.Logger
rdsAuth *utils.RDSAuth
serverVersion string
// collationCharset caches information_schema.COLLATIONS (collation id -> charset
// name), loaded lazily the first time the CDC path needs to transcode a column.
collationCharset atomic.Pointer[map[uint64]string]
// warnedCharsets dedupes the "unsupported/unknown charset" warnings so a stream
// of non-transcodable rows does not flood the logs.
warnedCharsets sync.Map
Comment thread
dtunikov marked this conversation as resolved.
binlogHeartbeatPeriod time.Duration
totalBytesRead atomic.Int64
deltaBytesRead atomic.Int64
Expand Down
Loading
Loading