tata色々な備忘録

データ解析、画像解析、化学分析などなど

ScalaでSparkのDataframe(一部Dataset)

列の追加に癖がある。
単純に列指定で追加するのが難しいようだ。

import org.apache.spark.sql.SparkSession

//Dataset 化
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").master("local").getOrCreate()
import spark.implicits._
val data = (0 to 10).toDS()
data.show()
//result
+-----+
|value|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+
//フィルター
data.filter($"value" >5).show()
//result
+-----+
|value|
+-----+
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+
//DataSetからRDDへ
val rdd = data.rdd

//RDDのプリント
print(rdd.collect())
//RDDのプリント②(行表示:非推奨)
rdd.collect().foreach(println)
//Arrayへ
val base = data.collect()
//2倍してArrayに戻す
 val array_maltipled = data.map(_ *2 ).collect()
//5足してArrayへ
 val array_plus5 = data.map(_ +5 ).collect()

//複数列Dataframe作成
//型の定義関数
case class Row(i: Double, j: Double, k: Double)
//組合せの配列へ変換
val xs = Array(base,array_maltipled ,array_plus5 ).transpose
//型の定義関数を利用してRDDを作成
val rdd = sc.parallelize(xs).map(ys => Row(ys(0), ys(1), ys(2)))
//Dataframeへ変換
val df = rdd.toDF("base","maltipled2","plus5")
//結果
+----+----------+-----+
|base|maltipled2|plus5|
+----+----------+-----+
| 0.0|       0.0|  5.0|
| 1.0|       2.0|  6.0|
| 2.0|       4.0|  7.0|
| 3.0|       6.0|  8.0|
| 4.0|       8.0|  9.0|
+----+----------+-----+

//カラム名の変更
 val newname =Seq("b","m","p")
 val name_change = df.toDF(newname: _*)
//result
+---+---+---+
|  b|  m|  p|
+---+---+---+
|0.0|0.0|5.0|
|1.0|2.0|6.0|
|2.0|4.0|7.0|
|3.0|6.0|8.0|
|4.0|8.0|9.0|
+---+---+---+
//インデックスの追加
import org.apache.spark.sql.functions._
val data_id = data_name.withColumn("id",monotonicallyIncreasingId)
//result
+---+---+---+---+
|  b|  m|  p| id|
+---+---+---+---+
|0.0|0.0|5.0|  0|
|1.0|2.0|6.0|  1|
|2.0|4.0|7.0|  2|
|3.0|6.0|8.0|  3|
|4.0|8.0|9.0|  4|
+---+---+---+---+
//カラム名の取得
data_id.columns

//列の追加
//idを振ってidで結合
val data2_id = data2.withColumn("id",monotonicallyIncreasingId)
val data_con2 =data_id.join(data2_id,"id")
//result
+---+-------+-----+
| id|numbers|value|
+---+-------+-----+
|  0|      0|    0|
|  1|      1|    0|
|  2|      2|    0|
|  3|      3|    1|

//列選択とArray化
data_con2.select($"value").collect()

参考

how to create DataFrame from multiple arrays in Spark Scala? - Stack Overflow
scala - Append a column to Data Frame in Apache Spark 1.3 - Stack Overflow
Join Two DataFrames without a Duplicated Column — Databricks Documentation