-
Notifications
You must be signed in to change notification settings - Fork 14.4k
Scala3 migration #11350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scala3 migration #11350
Conversation
Ping @ijuma I was able to have some progress with migrating to Scala3. I still need to check why some test failures occur only on Scala3. Let me know what you think about the changes. |
@@ -22,7 +22,7 @@ import java.util.{Collections, Properties} | |||
import joptsimple._ | |||
import kafka.common.AdminCommandFailedException | |||
import kafka.log.LogConfig | |||
import kafka.utils._ | |||
import kafka.utils.{immutable => _, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes like this are due to shadowing between kafka.utils.immutable
and the immutable
package in scala.collections
.
@@ -477,7 +477,7 @@ class ZkPartitionStateMachine(config: KafkaConfig, | |||
} else { | |||
val (logConfigs, failed) = zkClient.getLogConfigs( | |||
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet, | |||
config.originals() | |||
config.originals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes like this are the parenthesis-less methods that should be called without parenthesis
@@ -21,7 +21,7 @@ import java.io.File | |||
import java.nio.file.{Files, NoSuchFileException} | |||
import java.util.concurrent.locks.ReentrantLock | |||
|
|||
import LazyIndex._ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import was causing a cyclic problem in Scala3.
@@ -467,1251 +451,3 @@ object SocketServer { | |||
|
|||
val ListenerReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I mentioned about splitting classes present in SocketServer
into their own file
@@ -42,11 +42,11 @@ final class KafkaMetadataLog private ( | |||
// Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the | |||
// polling thread when snapshots are created. This object is also used to store any opened snapshot reader. | |||
snapshots: mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]], | |||
topicPartition: TopicPartition, | |||
topicPartitionArg: TopicPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes like this are to avoid the shadowing between the parameter and the method
@@ -27,7 +27,7 @@ import com.typesafe.scalalogging.LazyLogging | |||
import joptsimple._ | |||
import kafka.utils.Implicits._ | |||
import kafka.utils.{Exit, _} | |||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer} | |||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig => ClientConsumerConfig, ConsumerRecord, KafkaConsumer} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done to avoid shadowing
@@ -85,13 +85,13 @@ object DecodeJson { | |||
else decodeJson.decodeEither(node).map(Some(_)) | |||
} | |||
|
|||
implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => { | |||
implicit def decodeSeq[E, S[E] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala3 compiler is more strict with type definitions and the previous one wasn't really being satisfied
@@ -234,7 +234,7 @@ public KafkaClusterTestKit build() throws Exception { | |||
Option.apply(threadNamePrefix), | |||
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(), | |||
connectFutureManager.future, | |||
Server.SUPPORTED_FEATURES() | |||
Server$.MODULE$.SUPPORTED_FEATURES() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes like this are related to scala/scala3#13572
@@ -2244,7 +2244,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { | |||
def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = { | |||
removeAllClientAcls() | |||
|
|||
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { | |||
for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion.toInt to ApiKeys.DESCRIBE_CLUSTER.latestVersion.toInt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The to
method is not present in Short
type and Scala3 doesn't widen the type automatically
var e = assertThrows(classOf[ExecutionException], () => { | ||
alterResult.values.get(topic1).get | ||
() | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is because of scala/scala3#13549
One of tus failures was https://issues.apache.org/jira/browse/KAFKA-8785 |
Ignore the others
… instead of the most specific.
Scala3 doesn't convert that freely Int's to Long's anymore
@@ -45,7 +46,7 @@ | |||
private long flushStartOffsetCheckpointMs = 10000L; | |||
private long retentionCheckMs = 1000L; | |||
private int maxPidExpirationMs = 60000; | |||
private ApiVersion interBrokerProtocolVersion = ApiVersion.latestVersion(); | |||
private ApiVersion interBrokerProtocolVersion = ApiVersion$.MODULE$.latestVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been solved in upcoming Scala versions, Bug fix here: scala/scala3#13572
.ofType(classOf[java.lang.Long]) | ||
.defaultsTo(5000L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type of changes are due to this bug: scala/scala3#13630
opts.options.valueOf(opts.interBrokerThrottleOpt).longValue(), | ||
opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt).longValue(), | ||
opts.options.valueOf(opts.timeoutOpt).longValue()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some times Scala 3 can't automatically widen Ints to Longs
@@ -2374,7 +2374,7 @@ class GroupMetadataManagerTest { | |||
minOneMessage = EasyMock.eq(true))) | |||
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords)) | |||
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) | |||
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18)) | |||
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some strange discrepancy in Scala 3 compiler, need to find the cause and report it. Even though the impact on the code base is non-existent
class MockProducerIdManager(val brokerId: Int, val idStart: Long, val idLen: Int) | ||
extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) { | ||
|
||
var startId = idStart | ||
var error: Errors = Errors.NONE | ||
|
||
override private[transaction] def sendRequest(): Unit = { | ||
if (error == Errors.NONE) { | ||
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse( | ||
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen))) | ||
idStart += idLen | ||
new AllocateProducerIdsResponseData().setProducerIdStart(startId).setProducerIdLen(idLen))) | ||
startId += idLen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are due to this bug that is solved in upcoming Scala versions: scala/scala3#13630
<Match> | ||
<Field name="~.*\$lzy1"/> | ||
<Bug pattern="NM_FIELD_NAMING_CONVENTION"/> | ||
</Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is due to the new naming convention for lazy fields in Scala 3. This is another false positive
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.server"/> | ||
<Source name="AbstractFetcherThread.scala"/> | ||
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/> | ||
</Match> | ||
|
||
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.log"/> | ||
<Source name="LogLoader.scala"/> | ||
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/> | ||
</Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 changes seem to me like a false positive when compiling in Scala 3
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.coordinator.transaction"/> | ||
<Source name="ProducerIdManager.scala"/> | ||
<Bug pattern="IS2_INCONSISTENT_SYNC"/> | ||
</Match> | ||
|
||
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.network"/> | ||
<Source name="Acceptor.scala"/> | ||
<Bug pattern="IS2_INCONSISTENT_SYNC"/> | ||
</Match> | ||
|
||
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.utils"/> | ||
<Source name="KafkaScheduler.scala"/> | ||
<Bug pattern="IS2_INCONSISTENT_SYNC"/> | ||
</Match> | ||
|
||
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.network"/> | ||
<Source name="SocketServer.scala"/> | ||
<Bug pattern="IS2_INCONSISTENT_SYNC"/> | ||
</Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a bug in bytecode generation in regards of synchronized blocks. Need to reproduce this and send a bug report.
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.network"/> | ||
<Source name="ConnectionQuotas.scala"/> | ||
<Bug pattern="VO_VOLATILE_INCREMENT"/> | ||
</Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a bug in bytecode generation in regards of volatile variables. Need to reproduce this and send a bug report.
<Match> | ||
<!-- This warning only appears during Scala 3 --> | ||
<Package name="kafka.tools"/> | ||
<Source name="ConsoleConsumer.scala"/> | ||
<Bug pattern="DB_DUPLICATE_BRANCHES"/> | ||
</Match> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seem to me like a false positive when compiling in Scala 3
Current status of the PR:
In this PR there are several types of changes: I propose perform the Scala 3 migration in several steps:
What are your thoughts? @ijuma |
Any plans to prioritize the Scala 3 migration? Would be nice to see this happening. |
This PR lays the groundwork for the Scala3 migration of the code base. Gradle's Scala3 plugin is merged but not released yet (see PR).
During the migration I encountered 2 different bugs in Scala3, they are filed here and here.
Types of changes done in this PR:
SockerServer
extra classes were split into different files as Scala3 compiler was failing to find them when referenced in other classes in this same file.foreach
java collections method was not directly usable under Scala3, so I transformed it to the Scala collection's one (this might be another bug in Scala3, I need to investigate)Compiling with Scala3
In order to test this locally one can run the following:
./gradlew wrapper --gradle-distribution-url=https://services.gradle.org/distributions-snapshots/gradle-7.3-20210906222431+0000-bin.zip
And then the usual
./gradlew compileTestScala -PscalaVersion=3.0
Notes
Jackson is using "2.13.0-rc2" version as it's the one that contains Scala3 improvements, it's not really needed to successfully compile though.
Extra information, Scala3 is compiling in "Migration Mode", meaning it outputs some warnings about deprecated and dropped features. See Migration Mode for further info.
All these warnings can be automatically fixed by the Scala compiler itself.
Current Problems
Spotbugs is currently detecting 30 problems with Scala3, it works fine when compiling with Scala 2.13. This currently blocks the execution of core and streams tests. By excluding
spotbugs
tests can be run and some tests are still failing, I need to find out why is this. Tests run successfully in Scala 2.13. To excludespotbugs
run the following:./gradlew test -x spotbugsMain -PscalaVersion=3.0
Committer Checklist (excluded from commit message)