Skip to content
Merged
41 changes: 37 additions & 4 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/text/encoding"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors/utils"
Expand Down Expand Up @@ -606,6 +607,38 @@ func (c *MySqlConnector) PullRecords(
enumMap := ev.Table.EnumStrValueMap()
setMap := ev.Table.SetStrValueMap()

// build colIdx -> encoding map based on event collation map
colEncodings := make([]encoding.Encoding, len(ev.Table.ColumnType))

@dtunikov dtunikov Jun 16, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, here we alloc a slice for every row in cdc
which is not ideal, ofc
we could maybe cache it with smht like map[columnName] -> encoding.Encoding
in that case though we would need to handle column collation changes, need to check if we can do that reliably
i don't know though if introduced complexity will be worth it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah remembering charsets on our end sounds like too much. We can maybe short-circuit the decoding for the common case though, where ev.Table.DefaultCharset and ColumnCharsets all fall under charsetsNoTranscode. E.g. return a nil slice from a helper and treat it specially

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added lazy allocation logic for this slice
now it gets allocated only when we encounter the first charset that needs to be transcoded

for colIdx, collationID := range ev.Table.CollationMap() {
if colIdx < 0 || colIdx >= len(colEncodings) {
continue
}
enc, err := c.collationEncoding(ctx, collationID)
if err != nil {
return err
}
colEncodings[colIdx] = enc
}
for colIdx, collationID := range ev.Table.EnumSetCollationMap() {
enc, err := c.collationEncoding(ctx, collationID)
if err != nil {
return err
}
if enc == nil {
continue
}
colEncodings[colIdx] = enc
for labels := range slices.Values([][]string{enumMap[colIdx], setMap[colIdx]}) {
for i, label := range labels {
decoded, err := decodeMySQLString(enc, label)
if err != nil {
return err
}
labels[i] = decoded
}
}
}

// Process TABLE_MAP_EVENT schema to detect new columns
var fields []*protos.FieldDescription
if ev.Table.ColumnName != nil {
Expand Down Expand Up @@ -645,7 +678,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -682,7 +715,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand All @@ -696,7 +729,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -734,7 +767,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, colEncodings[idx], c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down
164 changes: 164 additions & 0 deletions flow/connectors/mysql/charset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package connmysql

import (
"context"
"fmt"
"log/slog"

"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/encoding/korean"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/encoding/unicode/utf32"
"golang.org/x/text/transform"
)

// charsetsNoTranscode lists the MySQL character sets whose stored bytes are
// already valid UTF-8 (or are opaque binary)
var charsetsNoTranscode = map[string]struct{}{
"utf8": {},
"utf8mb3": {},
"utf8mb4": {},
"ascii": {},
"binary": {},
}

// mysqlCharsetEncodings maps a MySQL character set name to the golang.org/x/text
var mysqlCharsetEncodings = map[string]encoding.Encoding{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List known unsupported as well: armscii8, dec8, geostd8, hp8, keybcs2, macce, swe7 (all rare enough until we get a signal otherwise)

// Single-byte / Windows & ISO code pages.
"latin1": charmap.Windows1252,
"latin2": charmap.ISO8859_2,
"latin5": charmap.Windows1254,
Comment thread
dtunikov marked this conversation as resolved.
Outdated
"latin7": charmap.ISO8859_13,
"cp1250": charmap.Windows1250,
"cp1251": charmap.Windows1251,
"cp1256": charmap.Windows1256,
"cp1257": charmap.Windows1257,
"cp850": charmap.CodePage850,
"cp852": charmap.CodePage852,
"cp866": charmap.CodePage866,
"koi8r": charmap.KOI8R,
"koi8u": charmap.KOI8U,
"greek": charmap.ISO8859_7,
"hebrew": charmap.ISO8859_8,
"tis620": charmap.Windows874,
"macroman": charmap.Macintosh,

// Multi-byte CJK code pages.
"gbk": simplifiedchinese.GBK,
"gb2312": simplifiedchinese.GBK, // GBK is a strict superset of GB2312/EUC-CN
"gb18030": simplifiedchinese.GB18030,
"big5": traditionalchinese.Big5,
"sjis": japanese.ShiftJIS,
"cp932": japanese.ShiftJIS, // cp932 is a near-superset of Shift-JIS
"ujis": japanese.EUCJP,
"eucjpms": japanese.EUCJP,
"euckr": korean.EUCKR,

// Wide Unicode encodings.
"utf16": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf16le": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"ucs2": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf32": utf32.UTF32(utf32.BigEndian, utf32.IgnoreBOM),
}

// collationEncoding resolves a MySQL collation id (as carried in binlog
// TABLE_MAP metadata) to the x/text encoding needed to convert that column's
// bytes to UTF-8.
func (c *MySqlConnector) collationEncoding(ctx context.Context, collationID uint64) (encoding.Encoding, error) {
if collationID == 0 {
return nil, nil
}

charset, err := c.charsetForCollation(ctx, collationID)
Comment thread
dtunikov marked this conversation as resolved.
if err != nil {
return nil, err
}
if charset == "" {
c.warnCharsetOnce(fmt.Sprintf("collation:%d", collationID), func() {
c.logger.Warn("unknown MySQL collation id on CDC path, passing bytes through untranscoded",
slog.Uint64("collationID", collationID))
})
return nil, nil
}

if _, skip := charsetsNoTranscode[charset]; skip {
return nil, nil
}
if enc, ok := mysqlCharsetEncodings[charset]; ok {
return enc, nil
}

c.warnCharsetOnce(charset, func() {
c.logger.Warn("unsupported MySQL character set on CDC path, passing bytes through untranscoded",
slog.String("charset", charset), slog.Uint64("collationID", collationID))
})
return nil, nil
}

func (c *MySqlConnector) charsetForCollation(ctx context.Context, collationID uint64) (string, error) {
Comment thread
dtunikov marked this conversation as resolved.
if m := c.collationCharset.Load(); m != nil {
Comment thread
dtunikov marked this conversation as resolved.
return (*m)[collationID], nil
}

m, err := c.loadCollationCharsetMap(ctx)
if err != nil {
return "", err
}
c.collationCharset.Store(&m)
return m[collationID], nil
}

func (c *MySqlConnector) loadCollationCharsetMap(ctx context.Context) (map[uint64]string, error) {
rs, err := c.Execute(ctx, "SELECT ID, CHARACTER_SET_NAME FROM information_schema.COLLATIONS")
if err != nil {
return nil, fmt.Errorf("failed to load collation charset map: %w", err)
}

m := make(map[uint64]string, rs.RowNumber())
for idx := range rs.RowNumber() {
id, err := rs.GetInt(idx, 0)
if err != nil {
return nil, fmt.Errorf("failed to read collation id: %w", err)
}
charset, err := rs.GetString(idx, 1)
if err != nil {
return nil, fmt.Errorf("failed to read collation charset name: %w", err)
}
m[uint64(id)] = charset
}
return m, nil
}

func (c *MySqlConnector) warnCharsetOnce(key string, warn func()) {
if _, loaded := c.warnedCharsets.LoadOrStore(key, struct{}{}); !loaded {
warn()
}
}

// decodeMySQLBytes converts bytes stored in the column's character set to UTF-8.
func decodeMySQLBytes(enc encoding.Encoding, b []byte) (string, error) {
if enc == nil {
return string(b), nil
}
out, _, err := transform.Bytes(enc.NewDecoder(), b)
if err != nil {
return "", fmt.Errorf("failed to transcode column bytes to UTF-8: %w", err)
}
return string(out), nil
}

// decodeMySQLBytes converts string stored in the column's character set to UTF-8.
func decodeMySQLString(enc encoding.Encoding, s string) (string, error) {
if enc == nil {
return s, nil
}
out, _, err := transform.String(enc.NewDecoder(), s)
if err != nil {
return "", fmt.Errorf("failed to transcode column string to UTF-8: %w", err)
}
return out, nil
}
63 changes: 63 additions & 0 deletions flow/connectors/mysql/charset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package connmysql

import (
"testing"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/shared/types"
)

func TestDecodeMySQLBytes(t *testing.T) {
for _, tc := range []struct {
name string
charset string
in []byte
want string
}{
// MySQL latin1 is Windows-1252: 0x80 is the euro sign, 0xE9 is é.
{"latin1 accented", "latin1", []byte{0x63, 0x61, 0x66, 0xE9}, "café"},
{"latin1 euro", "latin1", []byte{0x80}, "€"},
// gbk: 0xC4E3 0xBAC3 = 你好
{"gbk hello", "gbk", []byte{0xC4, 0xE3, 0xBA, 0xC3}, "你好"},
// sjis: 0x83 0x4F = グ
{"sjis kana", "sjis", []byte{0x83, 0x4F}, "グ"},
// euckr: 0xBEC8 0xB3E7 = 안녕
{"euckr hangul", "euckr", []byte{0xBE, 0xC8, 0xB3, 0xE7}, "안녕"},
// utf8mb4 passes through unchanged (nil encoding).
{"utf8mb4 passthrough", "utf8mb4", []byte("café"), "café"},
{"ascii passthrough", "ascii", []byte("plain"), "plain"},
} {
t.Run(tc.name, func(t *testing.T) {
enc := mysqlCharsetEncodings[tc.charset] // nil for utf8mb4/ascii -> passthrough
got, err := decodeMySQLBytes(enc, tc.in)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}

// TestQValueFromMysqlRowEvent_Transcodes verifies that the CDC decode path applies
// the column's charset decoder
func TestQValueFromMysqlRowEvent_Transcodes(t *testing.T) {
ev := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_VARCHAR}}
logger := log.NewStructuredLogger(nil)
var coercionReported bool

latin1 := []byte{0x63, 0x61, 0x66, 0xE9} // "café" in latin1

// Without a decoder (utf8mb4 column) the bytes are reinterpreted verbatim - mojibake.
raw, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, nil, logger, &coercionReported)
require.NoError(t, err)
require.NotEqual(t, "café", raw.(types.QValueString).Val)

// With the latin1 decoder the value is correctly transcoded.
decoded, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, mysqlCharsetEncodings["latin1"], logger, &coercionReported)
require.NoError(t, err)
require.Equal(t, "café", decoded.(types.QValueString).Val)
}
3 changes: 3 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"net"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -36,6 +37,8 @@ type MySqlConnector struct {
logger log.Logger
rdsAuth *utils.RDSAuth
serverVersion string
collationCharset atomic.Pointer[map[uint64]string]
warnedCharsets sync.Map
Comment thread
dtunikov marked this conversation as resolved.
binlogHeartbeatPeriod time.Duration
totalBytesRead atomic.Int64
deltaBytesRead atomic.Int64
Expand Down
28 changes: 23 additions & 5 deletions flow/connectors/mysql/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/shopspring/decimal"
geom "github.com/twpayne/go-geos"
"go.temporal.io/sdk/log"
"golang.org/x/text/encoding"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/pkg/common"
Expand Down Expand Up @@ -369,7 +370,7 @@ func binaryColumnLength(mytype byte, meta uint16) int {
func QValueFromMysqlRowEvent(
ev *replication.TableMapEvent, idx int,
enums []string, sets []string,
qkind types.QValueKind, val any, logger log.Logger, coercionReported *bool,
qkind types.QValueKind, val any, enc encoding.Encoding, logger log.Logger, coercionReported *bool,
) (types.QValue, error) {
mytype := ev.ColumnType[idx]

Expand Down Expand Up @@ -507,10 +508,19 @@ func QValueFromMysqlRowEvent(
case types.QValueKindBytes:
return types.QValueBytes{Val: val}, nil
case types.QValueKindString:
return types.QValueString{Val: string(val)}, nil
s, err := decodeMySQLBytes(enc, val)
if err != nil {
return nil, err
}
return types.QValueString{Val: s}, nil
case types.QValueKindEnum:
return types.QValueEnum{Val: string(val)}, nil
s, err := decodeMySQLBytes(enc, val)
if err != nil {
return nil, err
}
return types.QValueEnum{Val: s}, nil
case types.QValueKindJSON:
// MySQL always stores JSON internally as utf8mb4, so it never needs transcoding.
return types.QValueJSON{Val: string(val)}, nil
case types.QValueKindGeometry:
// Handle geometry data as binary (WKB format)
Expand All @@ -533,9 +543,17 @@ func QValueFromMysqlRowEvent(
}
return types.QValueBytes{Val: b}, nil
case types.QValueKindString:
return types.QValueString{Val: val}, nil
s, err := decodeMySQLString(enc, val)
if err != nil {
return nil, err
}
return types.QValueString{Val: s}, nil
case types.QValueKindEnum:
return types.QValueEnum{Val: val}, nil
s, err := decodeMySQLString(enc, val)
if err != nil {
return nil, err
}
return types.QValueEnum{Val: s}, nil
case types.QValueKindJSON:
return types.QValueJSON{Val: val}, nil
case types.QValueKindTime:
Expand Down
Loading
Loading