ScalaでSparkのDataframe②
ScalaでSpark Dataframe。
たまにメモリ解放しないと謎挙動示すので注意。
irisを使用
//CSVの読み込み。OptintionのInferSchema tureで型を自動認識する var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(filePass) +-----------+----------+-----------+----------+-----------+ |SepalLength|SepalWidth|PetalLength|PetalWidth| Name| +-----------+----------+-----------+----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +-----------+----------+-----------+----------+-----------+ //列名の取得 var fileName =new File(filePass).getName.replace(".csv","") //列名の置換 var df = df.withColumnRenamed(df.columns(1),fileName) +-----------+----+----+----------+-----------+ |SepalLength|iris|iris|PetalWidth| Name| +-----------+----+----+----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +-----------+----+----+----------+-----------+ //列の除去 df = df.drop("SepalLength") +----+-----------+----------+-----------+ |iris|PetalLength|PetalWidth| Name| +----+-----------+----------+-----------+ | 3.5| 1.4| 0.2|Iris-setosa| | 3.0| 1.4| 0.2|Iris-setosa| | 3.2| 1.3| 0.2|Iris-setosa| +----+-----------+----------+-----------+ //列の文字列結合 df_connect = df.withColumn("idx",concat_ws("-",$"PetalLength" ,$"PetalWidth")) +----+-----------+----------+-----------+-------+ |iris|PetalLength|PetalWidth| Name| idx| +----+-----------+----------+-----------+-------+ | 3.5| 1.4| 0.2|Iris-setosa|1.4-0.2| | 3.0| 1.4| 0.2|Iris-setosa|1.4-0.2| | 3.2| 1.3| 0.2|Iris-setosa|1.3-0.2| +----+-----------+----------+-----------+-------+ //列の四則演算 val df_connect2 = df.withColumn("idx2",$"PetalLength"*100+$"PetalWidth") +----+-----------+----------+-----------+-----+ |iris|PetalLength|PetalWidth| Name| idx2| +----+-----------+----------+-----------+-----+ | 3.5| 1.4| 0.2|Iris-setosa|140.2| | 3.0| 1.4| 0.2|Iris-setosa|140.2| | 3.2| 1.3| 0.2|Iris-setosa|130.2| +----+-----------+----------+-----------+-----+ //列抽出 val df1 = df.select("iris","PetalLength","PetalWidth") +----+-----------+----------+ |iris|PetalLength|PetalWidth| +----+-----------+----------+ | 3.5| 1.4| 0.2| | 3.0| 1.4| 0.2| | 3.2| 1.3| 0.2| +----+-----------+----------+ val df2 = df.select("iris","PetalLength","name") //キーを使用したdataframeの結合 //キーはSeq内に記載。複数可能。 //結合法は、"inner", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"がある。 //結合法の詳細は参考を参照 val join_df = df1.join(df2,Seq("iris","PetalLength"),joinType="outer") +----+-----------+----------+---------------+ |iris|PetalLength|PetalWidth| Name| +----+-----------+----------+---------------+ | 2.7| 4.1| 1.0|Iris-versicolor| | 2.9| 4.5| 1.5|Iris-versicolor| | 2.5| 3.9| 1.1|Iris-versicolor| +----+-----------+----------+---------------+
参考
scala - What are the various join types in Spark? - Stack Overflow