Skip to content

Cosmos spark3 write code path DataSourceV2 skeleton #17532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-0_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<version>1.7.30</version> <!-- {x-version-update;org.slf4j:slf4j-simple;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.16.1</version> <!-- {x-version-update;org.assertj:assertj-core;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -81,6 +87,25 @@
<version>3.2.2</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest_2.12;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_2.12</artifactId>
<version>3.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-flatspec_2.12</artifactId>
<version>3.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalamock</groupId>
<artifactId>scalamock_2.12</artifactId>
<version>5.0.0</version>
<scope>test</scope>
</dependency>

<!-- Added this provided dependency to include necessary annotations used by "reactor-core".
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
which is used in @Nullable annotation in reactor core classes.
Expand Down Expand Up @@ -108,6 +133,7 @@
<filtering>true</filtering>
<includes>
<include>META-INF/project.properties</include>
<include>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</include>
</includes>
</resource>
</resources>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.azure.cosmos.spark.CosmosDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}

class CosmosBatchWriter extends BatchWrite with CosmosLoggingTrait {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): DataWriterFactory = new CosmosDataWriteFactory()

override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = {
// TODO
}

override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = {
// TODO
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import java.util

import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class CosmosDataSource extends DataSourceRegister with TableProvider with CosmosLoggingTrait {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

override def inferSchema(caseInsensitiveStringMap: CaseInsensitiveStringMap): StructType = {
getTable(null,
Array.empty[Transform],
caseInsensitiveStringMap.asCaseSensitiveMap()).schema()
}

override def shortName(): String = "cosmos.write"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mental model I though about something like

cosmos.items (which would implement the inetrfaces for read and write(batch and point))

cosmos.changefeed for changefeed - just read interfaces

Copy link
Contributor Author

@moderakh moderakh Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had picked "cosmos.write" to get started. "cosmos.items" for batch read/write looks better to me.

regarding "cosmos.changefeed", that will be used mainly for streaming scenario right? should we add streaming suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changefeed would be used for both streaming and batch (mostly streaming i guess but we also have plenty of customers using batch in regular intervals)
Streaming and batch are just different notions on the read write capabilities so I would not add any prefixes there
But between Items and Changefeed there are significant differences - write on CF isn't possible and even read is very different because for example predicate push down isn't possible for CF etc.


override def getTable(structType: StructType, transforms: Array[Transform], map: util.Map[String, String]): Table = {
// getTable - This is used for loading table with user specified schema and other transformations.
new CosmosTable(structType, transforms, map)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import java.util.UUID

import com.azure.cosmos.implementation.TestConfigurations
import com.azure.cosmos.{ConsistencyLevel, CosmosClientBuilder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

class CosmosDataWriteFactory extends DataWriterFactory with CosmosLoggingTrait {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

override def createWriter(i: Int, l: Long): DataWriter[InternalRow] = new CosmosWriter()

class CosmosWriter() extends DataWriter[InternalRow] {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

// TODO moderakh account config and databaseName, containerName need to passed down from the user
val client = new CosmosClientBuilder()
.key(TestConfigurations.MASTER_KEY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assuem long term we would want to have a cache similar to what I added in the 3.* release for today's OLTP connector? If so I can take a stab at that early next week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to CosmosDBConnectionCache.scala

.endpoint(TestConfigurations.HOST)
.consistencyLevel(ConsistencyLevel.EVENTUAL)
.buildAsyncClient();
val databaseName = "testDB"
val containerName = "testContainer"

override def write(internalRow: InternalRow): Unit = {
// TODO moderakh: schema is hard coded for now to make end to end TestE2EMain work implement schema inference code
val userProvidedSchema = StructType(Seq(StructField("number", IntegerType), StructField("word", StringType)))

val objectNode = CosmosRowConverter.internalRowToObjectNode(internalRow, userProvidedSchema)
// TODO: moderakh how should we handle absence of id?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks liek the best approach - to generate it in the spark layer before calling the sdk

if (!objectNode.has("id")) {
objectNode.put("id", UUID.randomUUID().toString)
}
client.getDatabase(databaseName)
.getContainer(containerName)
.createItem(objectNode)
.block()
}

override def commit(): WriterCommitMessage = {
new WriterCommitMessage {}
}

override def abort(): Unit = {
// TODO
}

override def close(): Unit = {
// TODO
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import org.slf4j.{Logger, LoggerFactory}


trait CosmosLoggingTrait {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = _ // scalastyle:ignore

// Method to get the logger name for this object
protected def logName: String = {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")
}

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
// scalastyle:ignore
log_ = LoggerFactory.getLogger(logName)
}
log_
}

// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}

protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}

protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}

protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}

protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}

protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}

protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}

protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}

protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import java.util

import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import scala.collection.JavaConverters._

/**
* CosmosTable is the entry point this is registered in the spark
* @param userProvidedSchema
* @param transforms
* @param map
*/
class CosmosTable(val userProvidedSchema: StructType,
val transforms: Array[Transform],
val map: util.Map[String, String])
extends Table with SupportsWrite with CosmosLoggingTrait {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

override def name(): String = "com.azure.cosmos.spark.write"

override def schema(): StructType = {
// TODO: moderakh add support for schema inference
// for now schema is hard coded to make TestE2EMain to work
StructType(Seq(StructField("number", IntegerType), StructField("word", StringType)))
}

override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava

override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): WriteBuilder = new CosmosWriterBuilder

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}

class CosmosWriterBuilder extends WriteBuilder with CosmosLoggingTrait {
logInfo(s"Instantiated ${this.getClass.getSimpleName}")

override def buildForBatch(): BatchWrite = new CosmosBatchWriter()
}

This file was deleted.

Loading