Skip to content

Commit 1e25fa7

Browse files
authored
Fix filepath wildcard for avro/parquet (#564)
Fix filepath wildcard for avro/parquet for sampler
1 parent 1d816bd commit 1e25fa7

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ package com.spotify.ratatool.samplers
1818

1919
import java.net.URI
2020
import java.nio.charset.Charset
21-
2221
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference}
2322
import com.google.common.hash.{HashCode, Hasher, Hashing}
2423
import com.spotify.ratatool.samplers.util.SamplerSCollectionFunctions._
2524
import com.spotify.ratatool.Command
2625
import com.spotify.ratatool.avro.specific.TestRecord
26+
import com.spotify.ratatool.io.FileStorage
2727
import com.spotify.ratatool.samplers.util._
2828
import com.spotify.scio.bigquery.TableRow
2929
import com.spotify.scio.io.ClosedTap
@@ -34,6 +34,7 @@ import org.apache.avro.Schema
3434
import org.apache.avro.generic.GenericRecord
3535
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions
3636
import org.apache.beam.sdk.io.FileSystems
37+
import org.apache.beam.sdk.io.fs.MatchResult.Metadata
3738
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
3839
import org.apache.beam.sdk.options.PipelineOptions
3940
import org.slf4j.LoggerFactory
@@ -156,6 +157,11 @@ object BigSampler extends Command {
156157
): Hasher =
157158
BigSamplerAvro.hashAvroField(avroSchema)(r, f, hasher)
158159

160+
private[samplers] def getMetadata(path: String): Seq[Metadata] = {
161+
require(FileStorage(path).exists, s"File `$path` does not exist!")
162+
FileStorage(path).listFiles
163+
}
164+
159165
// scalastyle:off method.length cyclomatic.complexity
160166
def singleInput(argv: Array[String]): ClosedTap[_] = {
161167
val (sc, args) = ContextAndArgs(argv)
@@ -231,6 +237,7 @@ object BigSampler extends Command {
231237
)
232238
val inputTbl = parseAsBigQueryTable(input).get
233239
val outputTbl = parseAsBigQueryTable(output).get
240+
234241
BigSamplerBigQuery.sample(
235242
sc,
236243
inputTbl,
@@ -253,8 +260,11 @@ object BigSampler extends Command {
253260
)
254261
// Prompts FileSystems to load service classes, otherwise fetching schema from non-local fails
255262
FileSystems.setDefaultPipelineOptions(opts)
263+
val fileNames = getMetadata(input).map(_.resourceId().getFilename)
264+
256265
input match {
257-
case avroPath if input.endsWith("avro") =>
266+
case avroPath if fileNames.exists(_.endsWith("avro")) =>
267+
log.info(s"Found *.avro files in $avroPath, running BigSamplerAvro")
258268
BigSamplerAvro.sample(
259269
sc,
260270
avroPath,
@@ -269,7 +279,8 @@ object BigSampler extends Command {
269279
sizePerKey,
270280
byteEncoding
271281
)
272-
case parquetPath if input.endsWith("parquet") =>
282+
case parquetPath if fileNames.exists(_.endsWith("parquet")) =>
283+
log.info(s"Found *.parquet files in $parquetPath, running BigSamplerParquet")
273284
BigSamplerParquet.sample(
274285
sc,
275286
parquetPath,

ratatool-sampling/src/test/scala/com/spotify/ratatool/samplers/BigSamplerTest.scala

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ package com.spotify.ratatool.samplers
1919
import java.io.File
2020
import java.nio.ByteBuffer
2121
import java.nio.file.{Files, Path}
22-
2322
import com.google.common.hash.Hasher
2423
import com.google.common.io.BaseEncoding
2524
import com.spotify.ratatool.Schemas
2625
import com.spotify.ratatool.avro.specific.TestRecord
2726
import com.spotify.ratatool.scalacheck._
28-
import com.spotify.ratatool.io.{AvroIO, FileStorage}
27+
import com.spotify.ratatool.io.{AvroIO, FileStorage, ParquetIO}
2928
import com.spotify.ratatool.samplers.util.{ByteHasher, HexEncoding, MurmurHash}
3029
import org.apache.avro.generic.GenericRecord
3130
import org.scalacheck.Prop.{all, forAll, proved}
@@ -38,6 +37,7 @@ import scala.language.postfixOps
3837
import org.scalatest.flatspec.AnyFlatSpec
3938
import org.scalatest.matchers.should.Matchers
4039

40+
// scalastyle:off file.size.limit
4141
object BigSamplerTest extends Properties("BigSampler") {
4242

4343
private val testSeed = Some(42)
@@ -453,14 +453,20 @@ sealed trait BigSamplerJobTestRoot
453453
val dir: Path = Files.createTempDirectory("ratatool-big-sampler-input")
454454
val file1 = new File(dir.toString, "part-00000.avro")
455455
val file2 = new File(dir.toString, "part-00001.avro")
456+
val fileParquet1 = new File(dir.toString, "part-00000.parquet")
457+
val fileParquet2 = new File(dir.toString, "part-00001.parquet")
456458

457459
override protected def beforeAll(configMap: ConfigMap): Unit = {
458460
AvroIO.writeToFile(data1, schema, file1)
459461
AvroIO.writeToFile(data2, schema, file2)
462+
ParquetIO.writeToFile(data1, schema, fileParquet1)
463+
ParquetIO.writeToFile(data2, schema, fileParquet2)
460464

461465
dir.toFile.deleteOnExit()
462466
file1.deleteOnExit()
463467
file2.deleteOnExit()
468+
fileParquet1.deleteOnExit()
469+
fileParquet2.deleteOnExit()
464470
}
465471

466472
protected def withOutFile(testCode: (File) => Any) {
@@ -476,6 +482,11 @@ sealed trait BigSamplerJobTestRoot
476482
FileStorage(p).listFiles.foldLeft(0)((i, m) =>
477483
i + AvroIO.readFromFile[GenericRecord](m.resourceId().toString).count(f)
478484
)
485+
486+
protected def countParquetRecords(p: String, f: GenericRecord => Boolean = _ => true): Long =
487+
FileStorage(p).listFiles.foldLeft(0)((i, m) =>
488+
i + ParquetIO.readFromFile(m.resourceId().toString).count(f)
489+
)
479490
}
480491

481492
class BigSamplerBasicJobTest extends BigSamplerJobTestRoot {
@@ -487,16 +498,31 @@ class BigSamplerBasicJobTest extends BigSamplerJobTestRoot {
487498
countAvroRecords(s"$outDir/*.avro").toDouble shouldBe totalElements * 0.5 +- 250
488499
}
489500

501+
it should "work for 50% for parquet" in withOutFile { outDir =>
502+
BigSampler.run(Array(s"--input=$dir/*.parquet", s"--output=$outDir", "--sample=0.5"))
503+
countParquetRecords(s"$outDir/*.parquet").toDouble shouldBe totalElements * 0.5 +- 250
504+
}
505+
490506
it should "work for 1%" in withOutFile { outDir =>
491507
BigSampler.run(Array(s"--input=$dir/*.avro", s"--output=$outDir", "--sample=0.01"))
492508
countAvroRecords(s"$outDir/*.avro").toDouble shouldBe totalElements * 0.01 +- 35
493509
}
494510

511+
it should "work for 1% for parquet" in withOutFile { outDir =>
512+
BigSampler.run(Array(s"--input=$dir/*.parquet", s"--output=$outDir", "--sample=0.01"))
513+
countParquetRecords(s"$outDir/*.parquet").toDouble shouldBe totalElements * 0.01 +- 35
514+
}
515+
495516
it should "work for 100%" in withOutFile { outDir =>
496517
BigSampler.run(Array(s"--input=$dir/*.avro", s"--output=$outDir", "--sample=1.0"))
497518
countAvroRecords(s"$outDir/*.avro") shouldBe totalElements
498519
}
499520

521+
it should "work for 100% for parquet" in withOutFile { outDir =>
522+
BigSampler.run(Array(s"--input=$dir/*.parquet", s"--output=$outDir", "--sample=1.0"))
523+
countParquetRecords(s"$outDir/*.parquet").toDouble shouldBe totalElements
524+
}
525+
500526
it should "work for 50% with hash field and seed" in withOutFile { outDir =>
501527
BigSampler.run(
502528
Array(
@@ -509,8 +535,41 @@ class BigSamplerBasicJobTest extends BigSamplerJobTestRoot {
509535
)
510536
countAvroRecords(s"$outDir/*.avro").toDouble shouldBe totalElements * 0.5 +- 2000
511537
}
538+
539+
it should "work for 50% with hash field and seed for parquet" in withOutFile { outDir =>
540+
BigSampler.run(
541+
Array(
542+
s"--input=$dir/*.parquet",
543+
s"--output=$outDir",
544+
"--sample=0.5",
545+
"--seed=42",
546+
"--fields=required_fields.int_field"
547+
)
548+
)
549+
countParquetRecords(s"$outDir/*.parquet").toDouble shouldBe totalElements * 0.5 +- 2000
550+
}
551+
}
552+
553+
class BigSamplerWildCardTest extends BigSamplerJobTestRoot {
554+
override def data1Size: Int = 10000
555+
override def data2Size: Int = 2500
556+
557+
override protected def beforeAll(configMap: ConfigMap): Unit = {
558+
ParquetIO.writeToFile(data1, schema, fileParquet1)
559+
ParquetIO.writeToFile(data2, schema, fileParquet2)
560+
561+
dir.toFile.deleteOnExit()
562+
fileParquet1.deleteOnExit()
563+
fileParquet2.deleteOnExit()
564+
}
565+
566+
"BigSampler" should "work for wildcard without file extension" in withOutFile { outDir =>
567+
BigSampler.run(Array(s"--input=$dir/part-*", s"--output=$outDir", "--sample=0.5"))
568+
countParquetRecords(s"$outDir/*.parquet").toDouble shouldBe totalElements * 0.5 +- 250
569+
}
512570
}
513571

572+
514573
class BigSamplerApproxDistJobTest extends BigSamplerJobTestRoot {
515574
override def data1Size: Int = 10000
516575
override def data2Size: Int = 2500

0 commit comments

Comments
 (0)