Skip to content

Scala3 migration #11350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@ pipeline {
}
}

stage('JDK 11 and Scala 3.0') {
agent { label 'ubuntu' }
tools {
jdk 'jdk_11_latest'
}
options {
timeout(time: 8, unit: 'HOURS')
timestamps()
}
environment {
SCALA_VERSION=3.0
}
steps {
doValidation()
doTest(env)
echo 'Skipping Kafka Streams archetype test for Java 16'
}
}

stage('ARM') {
agent { label 'arm4' }
options {
Expand Down Expand Up @@ -208,6 +227,76 @@ pipeline {
}
}

stage('JDK 8 and Scala 3.0') {
when {
not { changeRequest() }
beforeAgent true
}
agent { label 'ubuntu' }
tools {
jdk 'jdk_1.8_latest'
maven 'maven_3_latest'
}
options {
timeout(time: 8, unit: 'HOURS')
timestamps()
}
environment {
SCALA_VERSION=3.0
}
steps {
doValidation()
doTest(env)
tryStreamsArchetype()
}
}

stage('JDK 15 and Scala 3.0') {
when {
not { changeRequest() }
beforeAgent true
}
agent { label 'ubuntu' }
tools {
jdk 'jdk_15_latest'
}
options {
timeout(time: 8, unit: 'HOURS')
timestamps()
}
environment {
SCALA_VERSION=3.0
}
steps {
doValidation()
doTest(env)
tryStreamsArchetype()
}
}

stage('JDK 16 and Scala 3.0') {
when {
not { changeRequest() }
beforeAgent true
}
agent { label 'ubuntu' }
tools {
jdk 'jdk_16_latest'
}
options {
timeout(time: 8, unit: 'HOURS')
timestamps()
}
environment {
SCALA_VERSION=3.0
}
steps {
doValidation()
doTest(env)
tryStreamsArchetype()
}
}

stage('JDK 11 and Scala 2.12') {
when {
not { changeRequest() }
Expand Down
58 changes: 35 additions & 23 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ allprojects {
libs.javassist,
// ensure we have a single version in the classpath despite transitive dependencies
libs.scalaLibrary,
libs.scalaReflect,
libs.scalaCompiler,
libs.jacksonAnnotations,
// be explicit about the Netty dependency version instead of relying on the version set by
// ZooKeeper (potentially older and containing CVEs)
Expand Down Expand Up @@ -562,27 +562,33 @@ subprojects {

tasks.withType(ScalaCompile) {
scalaCompileOptions.additionalParameters = [
"-deprecation",
"-unchecked",
"-encoding", "utf8",
"-Xlog-reflective-calls",
"-feature",
"-language:postfixOps",
"-language:implicitConversions",
"-language:existentials",
"-Xlint:constant",
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
"-Xlint:poly-implicit-overload",
"-Xlint:private-shadow",
"-Xlint:stars-align",
"-Xlint:type-parameter-shadow",
"-Xlint:unused"
"-unchecked",
"-encoding",
"UTF-8",
"-feature",
"-language:postfixOps",
"-language:implicitConversions"
]
if (versions.baseScala == '2.13') {
scalaCompileOptions.additionalParameters += [
"-deprecation",
"-Xlog-reflective-calls",
"-language:existentials",
"-Xlint:constant",
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
"-Xlint:poly-implicit-overload",
"-Xlint:private-shadow",
"-Xlint:stars-align",
"-Xlint:type-parameter-shadow",
"-Ytasty-reader",
"-Xlint:unused"
]
}

// See README.md for details on this option and the meaning of each value
if (userScalaOptimizerMode.equals("method"))
Expand All @@ -598,12 +604,16 @@ subprojects {
scalaCompileOptions.additionalParameters += inlineFrom
}

if (versions.baseScala != '2.12') {
if (versions.baseScala == '2.13') {
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
}

if (versions.baseScala == '3') {
scalaCompileOptions.additionalParameters += ["-source:3.0-migration"]
}

// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
Expand Down Expand Up @@ -817,6 +827,7 @@ project(':core') {
// even though the `core` module doesn't expose any public API
api project(':clients')
api libs.scalaLibrary
api libs.scalaCompiler

implementation project(':server-common')
implementation project(':metadata')
Expand All @@ -833,7 +844,6 @@ project(':core') {
implementation libs.scalaCollectionCompat
implementation libs.scalaJava8Compat
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
implementation libs.scalaReflect
implementation libs.scalaLogging
implementation libs.slf4jApi
implementation(libs.zookeeper) {
Expand Down Expand Up @@ -1868,6 +1878,7 @@ project(':streams:streams-scala') {
api project(':streams')

api libs.scalaLibrary
api libs.scalaCompiler
api libs.scalaCollectionCompat

testImplementation project(':core')
Expand Down Expand Up @@ -2197,6 +2208,7 @@ project(':jmh-benchmarks') {
implementation libs.slf4jlog4j
implementation libs.scalaLibrary
implementation libs.scalaJava8Compat
implementation libs.scalaCompiler
}

tasks.withType(JavaCompile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server.builders;

import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
Expand Down Expand Up @@ -45,7 +46,7 @@ public class LogManagerBuilder {
private long flushStartOffsetCheckpointMs = 10000L;
private long retentionCheckMs = 1000L;
private int maxPidExpirationMs = 60000;
private ApiVersion interBrokerProtocolVersion = ApiVersion.latestVersion();
private ApiVersion interBrokerProtocolVersion = ApiVersion$.MODULE$.latestVersion();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been solved in upcoming Scala versions, Bug fix here: scala/scala3#13572

private Scheduler scheduler = null;
private BrokerTopicStats brokerTopicStats = null;
private LogDirFailureChannel logDirFailureChannel = null;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,8 @@ object ConsumerGroupCommand extends Logging {
val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
.withRequiredArg
.describedAs("timeout (ms)")
.ofType(classOf[Long])
.defaultsTo(5000)
.ofType(classOf[java.lang.Long])
.defaultsTo(5000L)
Comment on lines +1039 to +1040
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type of changes are due to this bug: scala/scala3#13630

val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file")
Expand Down
38 changes: 19 additions & 19 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ object ReassignPartitionsCommand extends Logging {
executeAssignment(adminClient,
opts.options.has(opts.additionalOpt),
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
opts.options.valueOf(opts.interBrokerThrottleOpt),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
opts.options.valueOf(opts.timeoutOpt))
opts.options.valueOf(opts.interBrokerThrottleOpt).longValue(),
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt).longValue(),
opts.options.valueOf(opts.timeoutOpt).longValue())
Comment on lines +241 to +243
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some times Scala 3 can't automatically widen Ints to Longs

} else if (opts.options.has(opts.cancelOpt)) {
cancelAssignment(adminClient,
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
opts.options.has(opts.preserveThrottlesOpt),
opts.options.valueOf(opts.timeoutOpt))
opts.options.valueOf(opts.timeoutOpt).longValue())
} else if (opts.options.has(opts.listOpt)) {
listReassignments(adminClient)
} else {
Expand Down Expand Up @@ -1473,25 +1473,25 @@ object ReassignPartitionsCommand extends Logging {
.ofType(classOf[String])
val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
val interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
.withRequiredArg()
.describedAs("throttle")
.ofType(classOf[Long])
.defaultsTo(-1)
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
.withRequiredArg()
.describedAs("throttle")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
val replicaAlterLogDirsThrottleOpt = parser.accepts("replica-alter-log-dirs-throttle",
"The movement of replicas between log directories on the same broker will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
.withRequiredArg()
.describedAs("replicaAlterLogDirsThrottle")
.ofType(classOf[Long])
.defaultsTo(-1)
"The movement of replicas between log directories on the same broker will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
"along with the --additional flag. The throttle rate should be at least 1 KB/s.")
.withRequiredArg()
.describedAs("replicaAlterLogDirsThrottle")
.ofType(classOf[java.lang.Long])
.defaultsTo(-1L)
val timeoutOpt = parser.accepts("timeout", "The maximum time in ms to wait for log directory replica assignment to begin.")
.withRequiredArg()
.describedAs("timeout")
.ofType(classOf[Long])
.defaultsTo(10000)
.ofType(classOf[java.lang.Long])
.defaultsTo(10000L)
val additionalOpt = parser.accepts("additional", "Execute this reassignment in addition to any " +
"other ongoing ones. This option can also be used to change the throttle of an ongoing reassignment.")
val preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not modify broker or topic throttles.")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{Collections, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.log.LogConfig
import kafka.utils._
import kafka.utils.{immutable => _, _}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes like this are due to shadowing between kafka.utils.immutable and the immutable package in scala.collections.

import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
import org.apache.kafka.clients.admin.CreateTopicsOptions
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}
future match {
case Some(a) =>
Await.result(a, 6000 millis)
Await.result(a, 6000.millis)
futures.synchronized { futures.dequeue() }
recurse()
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.originals()
config.originals
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes like this are the parenthesis-less methods that should be called without parenthesis

)

partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package kafka.coordinator.group

import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
import kafka.common.OffsetAndMetadata
Expand All @@ -33,8 +25,8 @@ import kafka.log.AppendOrigin
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.{immutable => _, _}
import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
Expand All @@ -48,6 +40,13 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}

import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.locks.ReentrantLock

import LazyIndex._

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import was causing a cyclic problem in Scala3.

import kafka.utils.CoreUtils.inLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
Expand All @@ -46,8 +46,8 @@ import org.apache.kafka.common.utils.Utils
* `AbstractIndex` instance.
*/
@threadsafe
class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) {

class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: LazyIndex.IndexWrapper, loadIndex: File => T) {
import LazyIndex._
private val lock = new ReentrantLock()

def file: File = indexWrapper.file
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
}

def overriddenConfigsAsLoggableString: String = {
val overriddenTopicProps = props.asScala.collect {
val overriddenTopicProps: Map[String, Object] = props.asScala.collect {
case (k: String, v) if overriddenConfigs.contains(k) => (k, v.asInstanceOf[AnyRef])
}
ConfigUtils.configMapToRedactedString(overriddenTopicProps.asJava, configDef)
Expand Down
Loading