Veel big data frameworks zoals Spark en Kafka zijn geschreven in Scala waardoor ik soms niet de exacte werking weet van een stuk code. In Scala kun je op verschillende manieren programmeren (functioneel, object georiënteerd) dat je niet snel uitgeleerd bent. Zo kwam ik tijdens het gebruik van de Spark Cassandra connector een stuk code tegen dat me nieuwsgierig maakte naar de precieze werking.

Met behulp van de Spark Cassandra connector is het mogelijk om datasets vanuit Cassandra in Spark jobs te gebruiken en weer weg te schrijven in Cassandra. Je kunt bijvoorbeeld een dataset inladen, verschillende aggregaties maken en vervolgens de resultaat set opslaan in een nieuwe tabel.

Allereerst wordt er een Spark configuratie aangemaakt:

1
2
3
4
5
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "192.168.123.10")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
val sc = new SparkContext("local[*]", "test", conf)

Een import toegevoegd:

1
import com.datastax.spark.connector._

Hierdoor zijn er extra methodes beschikbaar gekomen op de SparkContext en de RDD’s:

1
2
val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

Hoe is dit mogelijk?

De functies zijn zelfs tijdens het compileren beschikbaar?! In de object georiënteerde taal Java wordt o.a. inheritance (overerving) gebruikt om functionaliteit uit te breiden. Dat is vaak bruikbaar als het om eigen code gaat maar helaas minder bruikbaar als het om externe library’s gaat zoals in bovenstaand voorbeeld. Om deze reden heeft Scala een feature genaamd impliciete type conversies. (implicit type conversions)

Hoe werkt het precies?

Normaal gesproken zou de volgende code niet compileren:

1
val rdd = sc.cassandraTable("test", "words")

De scala compiler is echter zo slim om opzoek te gaan naar functies die een type conversie maken. Doordat we een import statement hebben toegevoegd wordt zo’n type conversies ingeladen:

1
2
3
4
5
package object connector {
implicit def toSparkContextFunctions(sc: SparkContext): SparkContextFunctions =
new SparkContextFunctions(sc)
...
}

En de SparkContextFuncties ziet er als volgt uit:

class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {
  
  def cassandraTable[T](keyspace: String, table: String)
                       (implicit connector: CassandraConnector = CassandraConnector(sc.getConf),
                        readConf: ReadConf = ReadConf.fromSparkConf(sc.getConf),
                        ct: ClassTag[T], rrf: RowReaderFactory[T],
                        ev: ValidRDDType[T]) =
    new CassandraTableScanRDD[T](sc, connector, keyspace, table, readConf = readConf)

}

Met behulp van impliciete type conversies is het dus mogelijk om externe library’s aan te passen. Het is een krachtig mechanisme om interfaces en methode's toe te kunnen voegen aan bestaande code. Gebruik het echter voorzichtig. De werking is niet direct zichtbaar en je moet naar de import statements kijken om te zien hoe de code precies werkt. En dat laatste doe je niet zo snel.