Skip to content
Merged
58 changes: 54 additions & 4 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
_ "github.com/pingcap/tidb/pkg/types/parser_driver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/text/encoding"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors/utils"
Expand Down Expand Up @@ -606,6 +607,55 @@ func (c *MySqlConnector) PullRecords(
enumMap := ev.Table.EnumStrValueMap()
setMap := ev.Table.SetStrValueMap()

// build colIdx -> encoding map based on event collation map.
// Allocated lazily: tables whose character columns are all utf8/ascii/binary
// (the common case) resolve every collation to a nil encoding and never allocate.
var colEncodings []encoding.Encoding
encFor := func(idx int) encoding.Encoding {
if idx >= 0 && idx < len(colEncodings) {
return colEncodings[idx]
}
return nil
}
setColEncoding := func(colIdx int, enc encoding.Encoding) {
if colEncodings == nil {
colEncodings = make([]encoding.Encoding, len(ev.Table.ColumnType))
}
colEncodings[colIdx] = enc
}
for colIdx, collationID := range ev.Table.CollationMap() {
if colIdx < 0 || colIdx >= len(ev.Table.ColumnType) {
continue
}
enc, err := c.collationEncoding(ctx, collationID, otelManager)
if err != nil {
return err
}
if enc == nil {
continue
}
setColEncoding(colIdx, enc)
}
for colIdx, collationID := range ev.Table.EnumSetCollationMap() {
enc, err := c.collationEncoding(ctx, collationID, otelManager)
if err != nil {
return err
}
if enc == nil {
continue
}
setColEncoding(colIdx, enc)
for labels := range slices.Values([][]string{enumMap[colIdx], setMap[colIdx]}) {
for i, label := range labels {
decoded, err := decodeMySQLString(enc, label)
if err != nil {
return err
}
labels[i] = decoded
}
}
}

// Process TABLE_MAP_EVENT schema to detect new columns
var fields []*protos.FieldDescription
if ev.Table.ColumnName != nil {
Expand Down Expand Up @@ -645,7 +695,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, encFor(idx), c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -682,7 +732,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, encFor(idx), c.logger, &coercionReported)
if err != nil {
return err
}
Expand All @@ -696,7 +746,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, encFor(idx), c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down Expand Up @@ -734,7 +784,7 @@ func (c *MySqlConnector) PullRecords(
continue
}
val, err := QValueFromMysqlRowEvent(ev.Table, idx, enumMap[idx], setMap[idx],
types.QValueKind(fd.Type), val, c.logger, &coercionReported)
types.QValueKind(fd.Type), val, encFor(idx), c.logger, &coercionReported)
if err != nil {
return err
}
Expand Down
183 changes: 183 additions & 0 deletions flow/connectors/mysql/charset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package connmysql

import (
"context"
"fmt"
"log/slog"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/encoding/korean"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/encoding/unicode/utf32"
"golang.org/x/text/transform"

"github.com/PeerDB-io/peerdb/flow/otel_metrics"
)

// charsetsNoTranscode lists the MySQL character sets whose stored bytes are
// already valid UTF-8 (or are opaque binary)
var charsetsNoTranscode = map[string]struct{}{
"utf8": {},
"utf8mb3": {},
"utf8mb4": {},
"ascii": {},
"binary": {},
}

// mysqlCharsetEncodings maps a MySQL character set name to the golang.org/x/text
var mysqlCharsetEncodings = map[string]encoding.Encoding{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List known unsupported as well: armscii8, dec8, geostd8, hp8, keybcs2, macce, swe7 (all rare enough until we get a signal otherwise)

// Single-byte / Windows & ISO code pages.
"latin1": charmap.Windows1252,
"latin2": charmap.ISO8859_2,
"latin5": charmap.ISO8859_9,
"latin7": charmap.ISO8859_13,
"cp1250": charmap.Windows1250,
"cp1251": charmap.Windows1251,
"cp1256": charmap.Windows1256,
"cp1257": charmap.Windows1257,
"cp850": charmap.CodePage850,
"cp852": charmap.CodePage852,
"cp866": charmap.CodePage866,
"koi8r": charmap.KOI8R,
"koi8u": charmap.KOI8U,
"greek": charmap.ISO8859_7,
"hebrew": charmap.ISO8859_8,
"tis620": charmap.Windows874,
"macroman": charmap.Macintosh,

// Multi-byte CJK code pages.
"gbk": simplifiedchinese.GBK,
"gb2312": simplifiedchinese.GBK, // GBK is a strict superset of GB2312/EUC-CN
"gb18030": simplifiedchinese.GB18030,
"big5": traditionalchinese.Big5,
"sjis": japanese.ShiftJIS,
"cp932": japanese.ShiftJIS, // cp932 is a near-superset of Shift-JIS
"ujis": japanese.EUCJP,
"eucjpms": japanese.EUCJP,
"euckr": korean.EUCKR,

// Wide Unicode encodings.
"utf16": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf16le": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"ucs2": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
"utf32": utf32.UTF32(utf32.BigEndian, utf32.IgnoreBOM),
}

// collationEncoding resolves a MySQL collation id (as carried in binlog
// TABLE_MAP metadata) to the x/text encoding needed to convert that column's
// bytes to UTF-8.
func (c *MySqlConnector) collationEncoding(
ctx context.Context, collationID uint64, otelManager *otel_metrics.OtelManager,
) (encoding.Encoding, error) {
if collationID == 0 {
return nil, nil
}

recordUsedCharsetMetrics := func(ctx context.Context, charset string, status string) {
otelManager.Metrics.UsedMySQLCharsetsCounter.Add(ctx, 1,
metric.WithAttributeSet(attribute.NewSet(
attribute.String("charset", charset),
attribute.String("status", status),
)),
)
}

charset, err := c.charsetForCollation(ctx, collationID)
Comment thread
dtunikov marked this conversation as resolved.
if err != nil {
return nil, err
}
if charset == "" {
c.warnCharsetOnce(fmt.Sprintf("collation:%d", collationID), func() {
c.logger.Warn("unknown MySQL collation id on CDC path, passing bytes through untranscoded",
slog.Uint64("collationID", collationID))
})
return nil, nil
}

if _, skip := charsetsNoTranscode[charset]; skip {
recordUsedCharsetMetrics(ctx, charset, "not_transcoded")
return nil, nil
}
if enc, ok := mysqlCharsetEncodings[charset]; ok {
recordUsedCharsetMetrics(ctx, charset, "transcoded")
return enc, nil
}

recordUsedCharsetMetrics(ctx, charset, "unsupported")
c.warnCharsetOnce(charset, func() {
c.logger.Warn("unsupported MySQL character set on CDC path, passing bytes through untranscoded",
slog.String("charset", charset), slog.Uint64("collationID", collationID))
})

return nil, nil
}

func (c *MySqlConnector) charsetForCollation(ctx context.Context, collationID uint64) (string, error) {
Comment thread
dtunikov marked this conversation as resolved.
if m := c.collationCharset.Load(); m != nil {
Comment thread
dtunikov marked this conversation as resolved.
return (*m)[collationID], nil
}

m, err := c.loadCollationCharsetMap(ctx)
if err != nil {
return "", err
}
c.collationCharset.Store(&m)
return m[collationID], nil
}

func (c *MySqlConnector) loadCollationCharsetMap(ctx context.Context) (map[uint64]string, error) {
rs, err := c.Execute(ctx, "SELECT ID, CHARACTER_SET_NAME FROM information_schema.COLLATIONS")
if err != nil {
return nil, fmt.Errorf("failed to load collation charset map: %w", err)
}

m := make(map[uint64]string, rs.RowNumber())
for idx := range rs.RowNumber() {
id, err := rs.GetInt(idx, 0)
if err != nil {
return nil, fmt.Errorf("failed to read collation id: %w", err)
}
charset, err := rs.GetString(idx, 1)
if err != nil {
return nil, fmt.Errorf("failed to read collation charset name: %w", err)
}
m[uint64(id)] = charset
}
return m, nil
}

func (c *MySqlConnector) warnCharsetOnce(key string, warn func()) {
if _, loaded := c.warnedCharsets.LoadOrStore(key, struct{}{}); !loaded {
warn()
}
}

// decodeMySQLBytes converts bytes stored in the column's character set to UTF-8.
func decodeMySQLBytes(enc encoding.Encoding, b []byte) (string, error) {
if enc == nil {
return string(b), nil
}
out, _, err := transform.Bytes(enc.NewDecoder(), b)
if err != nil {
return "", fmt.Errorf("failed to transcode column bytes to UTF-8: %w", err)
}
return string(out), nil
}

// decodeMySQLBytes converts string stored in the column's character set to UTF-8.
func decodeMySQLString(enc encoding.Encoding, s string) (string, error) {
if enc == nil {
return s, nil
}
out, _, err := transform.String(enc.NewDecoder(), s)
if err != nil {
return "", fmt.Errorf("failed to transcode column string to UTF-8: %w", err)
}
return out, nil
}
63 changes: 63 additions & 0 deletions flow/connectors/mysql/charset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package connmysql

import (
"testing"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/shared/types"
)

func TestDecodeMySQLBytes(t *testing.T) {
for _, tc := range []struct {
name string
charset string
in []byte
want string
}{
// MySQL latin1 is Windows-1252: 0x80 is the euro sign, 0xE9 is é.
{"latin1 accented", "latin1", []byte{0x63, 0x61, 0x66, 0xE9}, "café"},
{"latin1 euro", "latin1", []byte{0x80}, "€"},
// gbk: 0xC4E3 0xBAC3 = 你好
{"gbk hello", "gbk", []byte{0xC4, 0xE3, 0xBA, 0xC3}, "你好"},
// sjis: 0x83 0x4F = グ
{"sjis kana", "sjis", []byte{0x83, 0x4F}, "グ"},
// euckr: 0xBEC8 0xB3E7 = 안녕
{"euckr hangul", "euckr", []byte{0xBE, 0xC8, 0xB3, 0xE7}, "안녕"},
// utf8mb4 passes through unchanged (nil encoding).
{"utf8mb4 passthrough", "utf8mb4", []byte("café"), "café"},
{"ascii passthrough", "ascii", []byte("plain"), "plain"},
} {
t.Run(tc.name, func(t *testing.T) {
enc := mysqlCharsetEncodings[tc.charset] // nil for utf8mb4/ascii -> passthrough
got, err := decodeMySQLBytes(enc, tc.in)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}

// TestQValueFromMysqlRowEvent_Transcodes verifies that the CDC decode path applies
// the column's charset decoder
func TestQValueFromMysqlRowEvent_Transcodes(t *testing.T) {
ev := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_VARCHAR}}
logger := log.NewStructuredLogger(nil)
var coercionReported bool

latin1 := []byte{0x63, 0x61, 0x66, 0xE9} // "café" in latin1

// Without a decoder (utf8mb4 column) the bytes are reinterpreted verbatim - mojibake.
raw, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, nil, logger, &coercionReported)
require.NoError(t, err)
require.NotEqual(t, "café", raw.(types.QValueString).Val)

// With the latin1 decoder the value is correctly transcoded.
decoded, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindString, latin1, mysqlCharsetEncodings["latin1"], logger, &coercionReported)
require.NoError(t, err)
require.Equal(t, "café", decoded.(types.QValueString).Val)
}
3 changes: 3 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"net"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -36,6 +37,8 @@ type MySqlConnector struct {
logger log.Logger
rdsAuth *utils.RDSAuth
serverVersion string
collationCharset atomic.Pointer[map[uint64]string]
warnedCharsets sync.Map
Comment thread
dtunikov marked this conversation as resolved.
binlogHeartbeatPeriod time.Duration
totalBytesRead atomic.Int64
deltaBytesRead atomic.Int64
Expand Down
Loading
Loading