Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.*;

public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite, SupportsRowLevelDelete, SupportsRowLevelUpdate {
public class LakeSoulTableSink implements DynamicTableSink, SupportsPartitioning,
SupportsOverwrite, SupportsRowLevelDelete, SupportsRowLevelUpdate {

private final String summaryName;
private final String tableName;
Expand Down Expand Up @@ -153,19 +156,20 @@ public void applyStaticPartition(Map<String, String> map) {

@Override
public RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context) {
if(flinkConf.getBoolean(USE_CDC, false)){
flinkConf.set(DMLTYPE,DELETE_CDC);
}else{
flinkConf.set(DMLTYPE,DELETE);
if (flinkConf.getBoolean(USE_CDC, false)) {
flinkConf.set(DMLTYPE, DELETE_CDC);
} else {
flinkConf.set(DMLTYPE, DELETE);
}

return new LakeSoulRowLevelDelete();
}

@Override
public RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) {
flinkConf.set(DMLTYPE,UPDATE);
return new LakeSoulRowLevelUpdate();
public RowLevelUpdateInfo applyRowLevelUpdate(List<Column> updatedColumns,
@Nullable RowLevelModificationScanContext context) {
flinkConf.set(DMLTYPE, UPDATE);
return new LakeSoulRowLevelUpdate();
}

private class LakeSoulRowLevelDelete implements RowLevelDeleteInfo {
Expand All @@ -175,9 +179,9 @@ public Optional<List<Column>> requiredColumns() {
}

public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
if(flinkConf.getBoolean(USE_CDC, false)){
if (flinkConf.getBoolean(USE_CDC, false)) {
return RowLevelDeleteMode.DELETED_ROWS;
}else{
} else {
return RowLevelDeleteMode.REMAINING_ROWS;
}
}
Expand All @@ -190,9 +194,9 @@ public Optional<List<Column>> requiredColumns() {
}

public SupportsRowLevelUpdate.RowLevelUpdateMode getRowLevelUpdateMode() {
if (primaryKeyList.isEmpty()){
if (primaryKeyList.isEmpty()) {
return RowLevelUpdateMode.ALL_ROWS;
}else{
} else {
return SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.lakesoul.handle.LakeSoulTableColumnHandle;
import com.facebook.presto.lakesoul.handle.LakeSoulTableHandle;
import com.facebook.presto.lakesoul.handle.LakeSoulTableLayoutHandle;
import com.facebook.presto.lakesoul.pojo.TableSchema;
import com.facebook.presto.lakesoul.util.JsonUtil;
import com.facebook.presto.lakesoul.util.PrestoUtil;
import com.facebook.presto.spi.*;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.facebook.presto.common.type.Type;
import org.apache.spark.sql.types.StructType;

import java.time.ZoneId;
Expand Down Expand Up @@ -52,9 +47,11 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
if (!listSchemaNames(session).contains(tableName.getSchemaName())) {
return null;
}
TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName.getTableName(), tableName.getSchemaName());
TableInfo
tableInfo =
dbManager.getTableInfoByNameAndNamespace(tableName.getTableName(), tableName.getSchemaName());

if(tableInfo == null) {
if (tableInfo == null) {
throw new RuntimeException("no such table: " + tableName);
}

Expand All @@ -80,13 +77,15 @@ public List<ConnectorTableLayoutResult> getTableLayouts(
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
HashMap<String, ColumnHandle> allColumns = new HashMap<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
LakeSoulTableColumnHandle columnHandle =
new LakeSoulTableColumnHandle(tableHandle, field.getName(), PrestoUtil.convertToPrestoType(field.getType()));
new LakeSoulTableColumnHandle(tableHandle,
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()));
allColumns.put(field.getName(), columnHandle);
}
ConnectorTableLayout layout = new ConnectorTableLayout(
Expand Down Expand Up @@ -116,7 +115,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
}

TableInfo tableInfo = dbManager.getTableInfoByTableId(handle.getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + handle.getNames());
}
JSONObject properties = JSON.parseObject(tableInfo.getProperties());
Expand All @@ -126,13 +125,13 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect

List<ColumnMetadata> columns = new LinkedList<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
Map<String, Object> props = new HashMap<>();
for(Map.Entry<String, String> entry : field.getMetadata().entrySet()){
for (Map.Entry<String, String> entry : field.getMetadata().entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
ColumnMetadata columnMetadata = new ColumnMetadata(
Expand All @@ -152,14 +151,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
columns,
properties,
Optional.of("")
);
);
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
LakeSoulTableHandle table = (LakeSoulTableHandle) tableHandle;
TableInfo tableInfo = dbManager.getTableInfoByTableId(table.getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + table.getNames());
}
JSONObject properties = JSON.parseObject(tableInfo.getProperties());
Expand All @@ -168,35 +167,39 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
HashMap<String, ColumnHandle> map = new HashMap<>();
String cdcChangeColumn = properties.getString(CDC_CHANGE_COLUMN);
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
// drop cdc change column
if(cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)){
if (cdcChangeColumn != null && field.getName().equals(cdcChangeColumn)) {
continue;
}
LakeSoulTableColumnHandle columnHandle =
new LakeSoulTableColumnHandle(table, field.getName(), PrestoUtil.convertToPrestoType(field.getType()));
new LakeSoulTableColumnHandle(table,
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()));
map.put(field.getName(), columnHandle);
}
return map;
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
public ColumnMetadata getColumnMetadata(ConnectorSession session,
ConnectorTableHandle tableHandle,
ColumnHandle columnHandle) {
LakeSoulTableColumnHandle handle = (LakeSoulTableColumnHandle) columnHandle;
TableInfo tableInfo = dbManager.getTableInfoByTableId(handle.getTableHandle().getId());
if(tableInfo == null){
if (tableInfo == null) {
throw new RuntimeException("no such table: " + handle.getTableHandle().getNames());
}

StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableInfo.getTableSchema());
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
for( org.apache.arrow.vector.types.pojo.Field field: arrowSchema.getFields()){
for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
Map<String, Object> properties = new HashMap<>();
for(Map.Entry<String, String> entry : field.getMetadata().entrySet()){
for (Map.Entry<String, String> entry : field.getMetadata().entrySet()) {
properties.put(entry.getKey(), entry.getValue());
}
if(field.getName().equals(handle.getColumnName())){
if (field.getName().equals(handle.getColumnName())) {
return new ColumnMetadata(
field.getName(),
PrestoUtil.convertToPrestoType(field.getType()),
Expand All @@ -213,14 +216,15 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
SchemaTablePrefix prefix) {
//prefix: lakesoul.default.table1
String schema = prefix.getSchemaName();
String tableNamePrefix = prefix.getTableName();
List<String> tableNames = dbManager.listTableNamesByNamespace(schema);
Map<SchemaTableName, List<ColumnMetadata>> results = new HashMap<>();
for(String tableName : tableNames){
if(tableName.startsWith(tableNamePrefix)){
for (String tableName : tableNames) {
if (tableName.startsWith(tableNamePrefix)) {
SchemaTableName schemaTableName = new SchemaTableName(schema, tableName);
ConnectorTableHandle tableHandle = getTableHandle(session, schemaTableName);
ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {

List<Field> fields = recordSet.getColumnHandles().stream().map(item -> {
LakeSoulTableColumnHandle columnHandle = (LakeSoulTableColumnHandle) item;
return Field.nullable(columnHandle.getColumnName(), ArrowUtil.convertToArrowType(columnHandle.getColumnType()));
return Field.nullable(columnHandle.getColumnName(),
ArrowUtil.convertToArrowType(columnHandle.getColumnType()));
}).collect(Collectors.toList());
HashMap<String, ColumnHandle> allcolumns = split.getLayout().getAllColumns();
List<String> dataCols = recordSet.getColumnHandles().stream().map(item -> {
Expand All @@ -64,12 +65,15 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
for (String item : prikeys) {
if (!dataCols.contains(item)) {
LakeSoulTableColumnHandle columnHandle = (LakeSoulTableColumnHandle) allcolumns.get(item);
fields.add(Field.nullable(columnHandle.getColumnName(), ArrowUtil.convertToArrowType(columnHandle.getColumnType())));
fields.add(Field.nullable(columnHandle.getColumnName(),
ArrowUtil.convertToArrowType(columnHandle.getColumnType())));
}
}
// add extra cdc column
String cdcColumn = this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if(cdcColumn != null){
String
cdcColumn =
this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if (cdcColumn != null) {
fields.add(Field.notNullable(cdcColumn, new ArrowType.Utf8()));
}

Expand All @@ -78,7 +82,11 @@ public LakeSoulRecordCursor(LakeSoulRecordSet recordSet) throws IOException {
for (Map.Entry<String, String> partition : this.partitions.entrySet()) {
reader.setDefaultColumnValue(partition.getKey(), partition.getValue());
}
desiredTypes = recordSet.getColumnHandles().stream().map(item -> ((LakeSoulTableColumnHandle) item).getColumnType()).collect(Collectors.toList());
desiredTypes =
recordSet.getColumnHandles()
.stream()
.map(item -> ((LakeSoulTableColumnHandle) item).getColumnType())
.collect(Collectors.toList());
// set filters
this.recordSet.getSplit().getLayout().getFilters().forEach((filter) -> reader.addFilter(filter.toString()));
// set s3 options
Expand Down Expand Up @@ -122,11 +130,13 @@ public Type getType(int field) {

@Override
public boolean advanceNextPosition() {
String cdcColumn = this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
String
cdcColumn =
this.recordSet.getSplit().getLayout().getTableParameters().getString(PrestoUtil.CDC_CHANGE_COLUMN);
if (cdcColumn != null) {
while(next()){
while (next()) {
FieldVector vector = currentVCR.getVector(cdcColumn);
if(!vector.getObject(curRecordIdx).toString().equals("delete")){
if (!vector.getObject(curRecordIdx).toString().equals("delete")) {
return true;
}
}
Expand All @@ -136,7 +146,7 @@ public boolean advanceNextPosition() {
}
}

private boolean next(){
private boolean next() {
if (currentVCR == null) {
return false;
}
Expand Down Expand Up @@ -188,7 +198,8 @@ public long getLong(int field) {
if (timeZone.equals("") || !Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZone)) {
timeZone = TimeZone.getDefault().getID();
}
return DateTimeEncoding.packDateTimeWithZone(((TimeStampMicroTZVector) fv).get(curRecordIdx) / 1000, ZoneId.of(timeZone).toString());
return DateTimeEncoding.packDateTimeWithZone(((TimeStampMicroTZVector) fv).get(curRecordIdx) / 1000,
ZoneId.of(timeZone).toString());
}
if (fv instanceof DecimalVector) {
BigDecimal dv = ((DecimalVector) fv).getObject(curRecordIdx);
Expand Down Expand Up @@ -231,7 +242,10 @@ public Slice getSlice(int field) {
if (value instanceof BigDecimal) {
return Decimals.encodeScaledValue((BigDecimal) value);
}
throw new IllegalArgumentException("Field " + field + " is not a String, but is a " + value.getClass().getName());
throw new IllegalArgumentException("Field " +
field +
" is not a String, but is a " +
value.getClass().getName());
}

@Override
Expand Down
Loading