Skip to content

Commit f392007

Browse files
Merge pull request #551 from spotify/support_parquet
Support parquet in BigDiffy/BigSampler
2 parents fe7ad63 + e310ab4 commit f392007

File tree

21 files changed

+891
-454
lines changed

21 files changed

+891
-454
lines changed

build.sbt

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ lazy val ratatoolSampling = project
174174
libraryDependencies ++= Seq(
175175
"com.spotify" %% "scio-core" % scioVersion,
176176
"com.spotify" %% "scio-avro" % scioVersion,
177+
"com.spotify" %% "scio-parquet" % scioVersion,
177178
"com.spotify" %% "scio-google-cloud-platform" % scioVersion,
178179
"com.spotify" %% "scio-test" % scioVersion % "test",
179180
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
@@ -201,6 +202,7 @@ lazy val ratatoolDiffy = project
201202
name := "ratatool-diffy",
202203
libraryDependencies ++= Seq(
203204
"com.spotify" %% "scio-core" % scioVersion,
205+
"com.spotify" %% "scio-parquet" % scioVersion,
204206
"com.spotify" %% "scio-test" % scioVersion % "test",
205207
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
206208
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
@@ -224,7 +226,7 @@ lazy val ratatoolDiffy = project
224226
.enablePlugins(ProtobufPlugin)
225227
.dependsOn(
226228
ratatoolCommon % "compile->compile;test->test",
227-
ratatoolSampling,
229+
ratatoolSampling % "compile->compile;test->test",
228230
ratatoolScalacheck % "test"
229231
)
230232
.settings(protoBufSettings)
@@ -248,28 +250,6 @@ lazy val ratatoolShapeless = project
248250
ratatoolSampling
249251
)
250252

251-
lazy val ratatoolExtras = project
252-
.in(file("ratatool-extras"))
253-
.settings(commonSettings)
254-
.settings(
255-
name := "ratatool-extras",
256-
libraryDependencies ++= Seq(
257-
"org.apache.parquet" % "parquet-avro" % parquetVersion,
258-
"org.apache.avro" % "avro" % avroVersion,
259-
"org.apache.hadoop" % "hadoop-client" % hadoopVersion exclude ("org.slf4j", "slf4j-log4j12"),
260-
"com.google.cloud.bigdataoss" % "gcs-connector" % s"hadoop2-$gcsVersion",
261-
"com.google.cloud.bigdataoss" % "util" % gcsVersion,
262-
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
263-
"com.google.flogger" % "flogger-system-backend" % floggerVersion % "test"
264-
),
265-
testOptions in Test += Tests.Argument(TestFrameworks.ScalaCheck, "-verbosity", "3")
266-
)
267-
.dependsOn(
268-
ratatoolSampling % "compile->compile;test->test",
269-
ratatoolCommon % "test->test",
270-
ratatoolScalacheck % "test"
271-
)
272-
273253
lazy val ratatoolCli = project
274254
.in(file("ratatool-cli"))
275255
.settings(commonSettings ++ noPublishSettings)
@@ -335,7 +315,6 @@ val root = project
335315
ratatoolDiffy,
336316
ratatoolSampling,
337317
ratatoolShapeless,
338-
ratatoolExtras,
339318
ratatoolCli,
340319
ratatoolExamples
341320
)

ratatool-cli/src/main/scala/com/spotify/ratatool/tool/DirectSamplerParser.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ object DirectSamplerParser extends Command {
7070
)
7171
)
7272

73+
cmd("parquet")
74+
.action((_, c) => c.copy(mode = "parquet"))
75+
.text("Sample from Parquet")
76+
.children(
77+
opt[String]("in")
78+
.required()
79+
.action((x, c) => c.copy(in = x))
80+
.text("Parquet input path"),
81+
opt[String]("out")
82+
.required()
83+
.action((x, c) => c.copy(out = x))
84+
.text("Parquet output file")
85+
)
86+
7387
note("") // empty line
7488

7589
note("Common options")

ratatool-cli/src/main/scala/com/spotify/ratatool/tool/Ratatool.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package com.spotify.ratatool.tool
1919

2020
import com.spotify.ratatool.Command
2121
import com.spotify.ratatool.diffy.BigDiffy
22-
import com.spotify.ratatool.io.{AvroIO, BigQueryIO, TableRowJsonIO}
23-
import com.spotify.ratatool.samplers.{AvroSampler, BigQuerySampler, BigSampler}
22+
import com.spotify.ratatool.io.{AvroIO, BigQueryIO, ParquetIO, TableRowJsonIO}
23+
import com.spotify.ratatool.samplers.{AvroSampler, BigQuerySampler, BigSampler, ParquetSampler}
2424

2525
object Ratatool {
2626
private def commandSet[T <: Command](xs: T*): Set[String] = xs.map(_.command).toSet
@@ -62,9 +62,8 @@ object Ratatool {
6262
BigQueryIO.writeToTable(data, sampler.schema, table)
6363
}
6464
case "parquet" =>
65-
throw new NotImplementedError("""We have moved ParquetSampler to the ratatool-extras
66-
| project. If this causes any problems for you please open an issue
67-
| on github and let us know.""".stripMargin)
65+
val data = new ParquetSampler(o.in).sample(o.n, o.head)
66+
ParquetIO.writeToFile(data, data.head.getSchema, o.out)
6867
case _ =>
6968
throw new NotImplementedError(s"${o.mode} not implemented")
7069
}

ratatool-cli/src/test/scala/com/spotify/ratatool/tool/DirectSamplerParserTest.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,10 @@ class DirectSamplerParserTest extends AnyFlatSpec with Matchers {
3939
Some(c.copy(head = true, tableOut = "table:out"))
4040
)
4141
}
42+
43+
it should "parse parquet command" in {
44+
val c = config.copy(mode = "parquet")
45+
parse("parquet --in in --out out -n 1000") should equal(Some(c))
46+
parse("parquet --in in --out out -n 1000 --head") should equal(Some(c.copy(head = true)))
47+
}
4248
}

ratatool-diffy/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ From the CLI
2323
BigDiffy - pair-wise field-level statistical diff
2424
Usage: ratatool bigDiffy [dataflow_options] [options]
2525
26-
--input-mode=(avro|bigquery) Diff-ing Avro or BQ records
26+
--input-mode=(avro|bigquery|parquet) Diff-ing Avro or BQ records
2727
[--output-mode=(gcs|bigquery)] Saves to a text file in GCS or a BigQuery dataset. Defaults to GCS
2828
--key=<key> '.' separated key field. Specify multiple --key params or multiple ',' separated key fields for multi key usage.
2929
--lhs=<path> LHS File path or BigQuery table

ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,24 @@
1818
package com.spotify.ratatool.diffy
1919

2020
import java.nio.ByteBuffer
21-
2221
import com.google.api.services.bigquery.model.{TableFieldSchema, TableRow, TableSchema}
2322
import com.google.common.io.BaseEncoding
2423
import com.google.protobuf.AbstractMessage
2524
import com.spotify.ratatool.Command
26-
import com.spotify.ratatool.samplers.AvroSampler
25+
import com.spotify.ratatool.io.ParquetIO
26+
import com.spotify.ratatool.samplers.{AvroSampler, ParquetSampler}
2727
import com.spotify.scio._
2828
import com.spotify.scio.avro._
2929
import com.spotify.scio.bigquery._
30+
import com.spotify.scio.parquet.avro._
3031
import com.spotify.scio.bigquery.client.BigQuery
3132
import com.spotify.scio.bigquery.types.BigQueryType
3233
import com.spotify.scio.coders.Coder
3334
import com.spotify.scio.io.ClosedTap
3435
import com.spotify.scio.values.SCollection
3536
import com.twitter.algebird._
37+
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType
38+
import org.apache.avro.{Schema, SchemaCompatibility}
3639
import org.apache.avro.generic.GenericRecord
3740
import org.apache.avro.specific.SpecificRecordBase
3841
import org.apache.beam.sdk.io.TextIO
@@ -183,7 +186,6 @@ class BigDiffy[T: Coder](
183186

184187
/** Field level statistics. */
185188
lazy val fieldStats: SCollection[FieldStats] = globalAndFieldStats.flatMap(_._2)
186-
187189
}
188190

189191
sealed trait OutputMode
@@ -338,6 +340,26 @@ object BigDiffy extends Command with Serializable {
338340
): BigDiffy[T] =
339341
diff(sc.protobufFile(lhs), sc.protobufFile(rhs), diffy, keyFn)
340342

343+
/**
344+
* Diff two Parquet data sets.
345+
* Note that both typed-parquet and avro-parquet inputs are supported. However, in either case
346+
* the diff will be written in Parquet format as Avro GenericRecords. */
347+
def diffParquet(
348+
sc: ScioContext,
349+
lhs: String,
350+
rhs: String,
351+
keyFn: GenericRecord => MultiKey,
352+
diffy: AvroDiffy[GenericRecord]
353+
): BigDiffy[GenericRecord] = {
354+
val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
355+
implicit val grCoder: Coder[GenericRecord] = Coder.avroGenericRecordCoder(compatSchema)
356+
357+
diff(
358+
sc.parquetAvroFile[GenericRecord](lhs, compatSchema).map(identity),
359+
sc.parquetAvroFile[GenericRecord](rhs, compatSchema).map(identity), diffy, keyFn
360+
)
361+
}
362+
341363
/** Remove quotes wrapping string argument. **/
342364
def stripQuoteWrap(input: String): String = {
343365
val startChar = input.charAt(0)
@@ -738,6 +760,14 @@ object BigDiffy extends Command with Serializable {
738760
val rhsSCollection = sc.avroFile(rhs, schema)
739761
BigDiffy
740762
.diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)
763+
case "parquet" =>
764+
if (rowRestriction.isDefined) {
765+
throw new IllegalArgumentException(s"rowRestriction cannot be passed for Parquet inputs")
766+
}
767+
val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
768+
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)(
769+
Coder.avroGenericRecordCoder(compatSchema))
770+
BigDiffy.diffParquet(sc, lhs, rhs, avroKeyFn(keys), diffy)
741771
case "bigquery" =>
742772
// TODO: handle schema evolution
743773
val bq = BigQuery.defaultInstance()
@@ -755,5 +785,4 @@ object BigDiffy extends Command with Serializable {
755785
}
756786
// scalastyle:on cyclomatic.complexity
757787
// scalastyle:on method.length
758-
759788
}

ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import com.spotify.ratatool.avro.specific.{RequiredNestedRecord, TestRecord}
2525
import com.spotify.ratatool.scalacheck._
2626
import com.spotify.scio.testing.PipelineSpec
2727
import com.google.api.services.bigquery.model.TableRow
28-
import com.spotify.ratatool.diffy.BigDiffy.stripQuoteWrap
28+
import com.spotify.ratatool.diffy.BigDiffy.{avroKeyFn, stripQuoteWrap}
29+
import com.spotify.ratatool.io.{ParquetIO, ParquetTestData}
30+
import com.spotify.scio.ScioContext
31+
import org.apache.avro.Schema
32+
import org.apache.avro.generic.{GenericData, GenericRecord}
2933
import org.apache.beam.sdk.coders.AvroCoder
3034
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder
3135

@@ -366,4 +370,70 @@ class BigDiffyTest extends PipelineSpec {
366370

367371
exc.getMessage shouldBe "rowRestriction cannot be passed for avro inputs"
368372
}
373+
374+
it should "throw an exception when rowRestriction is specified for a parquet input" in {
375+
val exc = the[IllegalArgumentException] thrownBy {
376+
val args = Array(
377+
"--runner=DataflowRunner",
378+
"--project=fake",
379+
"--tempLocation=gs://tmp/tmp", // dataflow args
380+
"--input-mode=parquet",
381+
"--key=tmp",
382+
"--lhs=gs://tmp/lhs",
383+
"--rhs=gs://tmp/rhs",
384+
"--rowRestriction=true",
385+
"--output=gs://abc"
386+
)
387+
BigDiffy.run(args)
388+
}
389+
390+
exc.getMessage shouldBe "rowRestriction cannot be passed for Parquet inputs"
391+
}
392+
393+
it should "support Parquet schema evolution" in {
394+
val schema1 = new Schema.Parser().parse(
395+
"""|{"type":"record",
396+
|"name":"ParquetRecord",
397+
|"namespace":"com.spotify.ratatool.diffy",
398+
|"fields":[{"name":"id","type":"int"}]}
399+
""".stripMargin
400+
)
401+
// Schema 2 has added field with default value
402+
val schema2 = new Schema.Parser().parse(
403+
"""|{"type":"record",
404+
|"name":"ParquetRecord",
405+
|"namespace":"com.spotify.ratatool.diffy",
406+
|"fields":[
407+
|{"name":"id","type":"int"},
408+
|{"name":"s","type":["null","string"],"default":null}]}
409+
""".stripMargin
410+
)
411+
412+
def toGenericRecord(schema: Schema, fields: Map[String, _]): GenericRecord = {
413+
val gr = new GenericData.Record(schema)
414+
fields.foreach { case (k, v) => gr.put(k, v) }
415+
gr
416+
}
417+
val (lhs, rhs) = (
418+
ParquetTestData.createTempDir("lhs") + "/out.parquet",
419+
ParquetTestData.createTempDir("rhs") + "/out.parquet")
420+
421+
ParquetIO.writeToFile(
422+
(1 to 10).map(i => toGenericRecord(schema1, Map("id" -> i))),
423+
schema1, rhs)
424+
425+
ParquetIO.writeToFile(
426+
(1 to 9).map(i => toGenericRecord(schema2, Map("id" -> i, "s" -> i.toString))),
427+
schema2, lhs)
428+
429+
val sc = ScioContext()
430+
val bigDiffy = BigDiffy.diffParquet(sc,
431+
lhs,
432+
rhs,
433+
avroKeyFn(Seq("id")),
434+
new AvroDiffy[GenericRecord]())
435+
436+
bigDiffy.keyStats.filter(_.diffType == DiffType.MISSING_LHS) should haveSize(1)
437+
sc.run()
438+
}
369439
}

ratatool-extras/README.md

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)