ScalaでSparkのDataframe(一部Dataset)
列の追加に癖がある。
単純に列指定で追加するのが難しいようだ。
import org.apache.spark.sql.SparkSession //Dataset 化 val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").master("local").getOrCreate() import spark.implicits._ val data = (0 to 10).toDS() data.show() //result +-----+ |value| +-----+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| +-----+ //フィルター data.filter($"value" >5).show() //result +-----+ |value| +-----+ | 6| | 7| | 8| | 9| | 10| +-----+ //DataSetからRDDへ val rdd = data.rdd //RDDのプリント print(rdd.collect()) //RDDのプリント②(行表示:非推奨) rdd.collect().foreach(println) //Arrayへ val base = data.collect() //2倍してArrayに戻す val array_maltipled = data.map(_ *2 ).collect() //5足してArrayへ val array_plus5 = data.map(_ +5 ).collect() //複数列Dataframe作成 //型の定義関数 case class Row(i: Double, j: Double, k: Double) //組合せの配列へ変換 val xs = Array(base,array_maltipled ,array_plus5 ).transpose //型の定義関数を利用してRDDを作成 val rdd = sc.parallelize(xs).map(ys => Row(ys(0), ys(1), ys(2))) //Dataframeへ変換 val df = rdd.toDF("base","maltipled2","plus5") //結果 +----+----------+-----+ |base|maltipled2|plus5| +----+----------+-----+ | 0.0| 0.0| 5.0| | 1.0| 2.0| 6.0| | 2.0| 4.0| 7.0| | 3.0| 6.0| 8.0| | 4.0| 8.0| 9.0| +----+----------+-----+ //カラム名の変更 val newname =Seq("b","m","p") val name_change = df.toDF(newname: _*) //result +---+---+---+ | b| m| p| +---+---+---+ |0.0|0.0|5.0| |1.0|2.0|6.0| |2.0|4.0|7.0| |3.0|6.0|8.0| |4.0|8.0|9.0| +---+---+---+ //インデックスの追加 import org.apache.spark.sql.functions._ val data_id = data_name.withColumn("id",monotonicallyIncreasingId) //result +---+---+---+---+ | b| m| p| id| +---+---+---+---+ |0.0|0.0|5.0| 0| |1.0|2.0|6.0| 1| |2.0|4.0|7.0| 2| |3.0|6.0|8.0| 3| |4.0|8.0|9.0| 4| +---+---+---+---+ //カラム名の取得 data_id.columns //列の追加 //idを振ってidで結合 val data2_id = data2.withColumn("id",monotonicallyIncreasingId) val data_con2 =data_id.join(data2_id,"id") //result +---+-------+-----+ | id|numbers|value| +---+-------+-----+ | 0| 0| 0| | 1| 1| 0| | 2| 2| 0| | 3| 3| 1| //列選択とArray化 data_con2.select($"value").collect()
参考
how to create DataFrame from multiple arrays in Spark Scala? - Stack Overflow
scala - Append a column to Data Frame in Apache Spark 1.3 - Stack Overflow
Join Two DataFrames without a Duplicated Column — Databricks Documentation