ScalaでSparkのDataframe②
ScalaでSpark Dataframe。
たまにメモリ解放しないと謎挙動示すので注意。
irisを使用
//CSVの読み込み。OptintionのInferSchema tureで型を自動認識する var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(filePass) +-----------+----------+-----------+----------+-----------+ |SepalLength|SepalWidth|PetalLength|PetalWidth| Name| +-----------+----------+-----------+----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +-----------+----------+-----------+----------+-----------+ //列名の取得 var fileName =new File(filePass).getName.replace(".csv","") //列名の置換 var df = df.withColumnRenamed(df.columns(1),fileName) +-----------+----+----+----------+-----------+ |SepalLength|iris|iris|PetalWidth| Name| +-----------+----+----+----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +-----------+----+----+----------+-----------+ //列の除去 df = df.drop("SepalLength") +----+-----------+----------+-----------+ |iris|PetalLength|PetalWidth| Name| +----+-----------+----------+-----------+ | 3.5| 1.4| 0.2|Iris-setosa| | 3.0| 1.4| 0.2|Iris-setosa| | 3.2| 1.3| 0.2|Iris-setosa| +----+-----------+----------+-----------+ //列の文字列結合 df_connect = df.withColumn("idx",concat_ws("-",$"PetalLength" ,$"PetalWidth")) +----+-----------+----------+-----------+-------+ |iris|PetalLength|PetalWidth| Name| idx| +----+-----------+----------+-----------+-------+ | 3.5| 1.4| 0.2|Iris-setosa|1.4-0.2| | 3.0| 1.4| 0.2|Iris-setosa|1.4-0.2| | 3.2| 1.3| 0.2|Iris-setosa|1.3-0.2| +----+-----------+----------+-----------+-------+ //列の四則演算 val df_connect2 = df.withColumn("idx2",$"PetalLength"*100+$"PetalWidth") +----+-----------+----------+-----------+-----+ |iris|PetalLength|PetalWidth| Name| idx2| +----+-----------+----------+-----------+-----+ | 3.5| 1.4| 0.2|Iris-setosa|140.2| | 3.0| 1.4| 0.2|Iris-setosa|140.2| | 3.2| 1.3| 0.2|Iris-setosa|130.2| +----+-----------+----------+-----------+-----+ //列抽出 val df1 = df.select("iris","PetalLength","PetalWidth") +----+-----------+----------+ |iris|PetalLength|PetalWidth| +----+-----------+----------+ | 3.5| 1.4| 0.2| | 3.0| 1.4| 0.2| | 3.2| 1.3| 0.2| +----+-----------+----------+ val df2 = df.select("iris","PetalLength","name") //キーを使用したdataframeの結合 //キーはSeq内に記載。複数可能。 //結合法は、"inner", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"がある。 //結合法の詳細は参考を参照 val join_df = df1.join(df2,Seq("iris","PetalLength"),joinType="outer") +----+-----------+----------+---------------+ |iris|PetalLength|PetalWidth| Name| +----+-----------+----------+---------------+ | 2.7| 4.1| 1.0|Iris-versicolor| | 2.9| 4.5| 1.5|Iris-versicolor| | 2.5| 3.9| 1.1|Iris-versicolor| +----+-----------+----------+---------------+
参考
scala - What are the various join types in Spark? - Stack Overflow
ScalaでSparkのDataframe(一部Dataset)
列の追加に癖がある。
単純に列指定で追加するのが難しいようだ。
import org.apache.spark.sql.SparkSession //Dataset 化 val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").master("local").getOrCreate() import spark.implicits._ val data = (0 to 10).toDS() data.show() //result +-----+ |value| +-----+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| +-----+ //フィルター data.filter($"value" >5).show() //result +-----+ |value| +-----+ | 6| | 7| | 8| | 9| | 10| +-----+ //DataSetからRDDへ val rdd = data.rdd //RDDのプリント print(rdd.collect()) //RDDのプリント②(行表示:非推奨) rdd.collect().foreach(println) //Arrayへ val base = data.collect() //2倍してArrayに戻す val array_maltipled = data.map(_ *2 ).collect() //5足してArrayへ val array_plus5 = data.map(_ +5 ).collect() //複数列Dataframe作成 //型の定義関数 case class Row(i: Double, j: Double, k: Double) //組合せの配列へ変換 val xs = Array(base,array_maltipled ,array_plus5 ).transpose //型の定義関数を利用してRDDを作成 val rdd = sc.parallelize(xs).map(ys => Row(ys(0), ys(1), ys(2))) //Dataframeへ変換 val df = rdd.toDF("base","maltipled2","plus5") //結果 +----+----------+-----+ |base|maltipled2|plus5| +----+----------+-----+ | 0.0| 0.0| 5.0| | 1.0| 2.0| 6.0| | 2.0| 4.0| 7.0| | 3.0| 6.0| 8.0| | 4.0| 8.0| 9.0| +----+----------+-----+ //カラム名の変更 val newname =Seq("b","m","p") val name_change = df.toDF(newname: _*) //result +---+---+---+ | b| m| p| +---+---+---+ |0.0|0.0|5.0| |1.0|2.0|6.0| |2.0|4.0|7.0| |3.0|6.0|8.0| |4.0|8.0|9.0| +---+---+---+ //インデックスの追加 import org.apache.spark.sql.functions._ val data_id = data_name.withColumn("id",monotonicallyIncreasingId) //result +---+---+---+---+ | b| m| p| id| +---+---+---+---+ |0.0|0.0|5.0| 0| |1.0|2.0|6.0| 1| |2.0|4.0|7.0| 2| |3.0|6.0|8.0| 3| |4.0|8.0|9.0| 4| +---+---+---+---+ //カラム名の取得 data_id.columns //列の追加 //idを振ってidで結合 val data2_id = data2.withColumn("id",monotonicallyIncreasingId) val data_con2 =data_id.join(data2_id,"id") //result +---+-------+-----+ | id|numbers|value| +---+-------+-----+ | 0| 0| 0| | 1| 1| 0| | 2| 2| 0| | 3| 3| 1| //列選択とArray化 data_con2.select($"value").collect()
参考
how to create DataFrame from multiple arrays in Spark Scala? - Stack Overflow
scala - Append a column to Data Frame in Apache Spark 1.3 - Stack Overflow
Join Two DataFrames without a Duplicated Column — Databricks Documentation
ScalaでSpark
適宜追加予定
//CSV読み込み scala> val df = spark.read .format("csv").option("header", "true").option("mode", "DROPMALFORMED").option("inferSchema","True").load("iris.csv") // 各列の型表示 scala> df.printSchema() //表示(デフォルトは20行) scala> df.show() //SQL用の仮テーブル名設定 scala> df.createGlobalTempView("iris") //SQLによる列の四則演算(テーブル名必要、.show()で結果出力) scala> spark.sql("SELECT SepalLength+SepalWidth FROM global_temp.iris2").show() //列名の取得と列名スライス scala> val df_columns = df.columns.slice(1,3) df_sep: Array[String] = Array(SepalWidth, PetalLength) //列方向のスライス(列番号スライス⇨抽出データの取得) scala> val df_columns = df.columns.slice(1,4) df_columns: Array[String] = Array(SepalWidth, PetalLength, PetalWidth) scala> df.select(df_columns.head,df_columns.tail:_*).show(3) +----------+-----------+----------+ |SepalWidth|PetalLength|PetalWidth| +----------+-----------+----------+ | 3.5| 1.4| 0.2| | 3.0| 1.4| 0.2| | 3.2| 1.3| 0.2| +----------+-----------+----------+ //行方向のスライス import org.apache.spark.sql.functions._ scala> val idxDf = df.withColumn("idx", monotonicallyIncreasingId()) scala> idxDf.show(3) +-----------+----------+-----------+----------+-----------+---+ |SepalLength|SepalWidth|PetalLength|PetalWidth| Name|idx| +-----------+----------+-----------+----------+-----------+---+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| 0| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| 1| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| 2| +-----------+----------+-----------+----------+-----------+---+ scala> val ex_Df1 = idxDf.filter("idx > 10") scala> val ex_Df = ex_Df1.filter("idx < 15") scala> ex_Df.show() +-----------+----------+-----------+----------+-----------+---+ |SepalLength|SepalWidth|PetalLength|PetalWidth| Name|idx| +-----------+----------+-----------+----------+-----------+---+ | 4.8| 3.4| 1.6| 0.2|Iris-setosa| 11| | 4.8| 3.0| 1.4| 0.1|Iris-setosa| 12| | 4.3| 3.0| 1.1| 0.1|Iris-setosa| 13| | 5.8| 4.0| 1.2| 0.2|Iris-setosa| 14| +-----------+----------+-----------+----------+-----------+---+
参考
python - Splitting DataFrames in Apache Spark - Stack Overflow
scala - Get a range of columns of Spark RDD - Stack Overflow
scala覚え書き
位置参照(スライス)に関して追記
1. ListやArrayの操作基本
scala> val test_array = (0 to 10).toArray test_array: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //範囲抽出 scala> test_array.slice(3,7) res38: Array[Int] = Array(3, 4, 5, 6) //位置参照 scala> test_array(5) res52: Int = 5 //先頭取り出し scala> test_array.head res45: Int = 0 //最後尾取り出し scala> test_array.last res48: Int = 10 //先頭除去 scala> val tail_array = test_array.tail tail_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //反転 scala> test_array.reverse res46: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0) //最後尾の削除 scala> val temp = test_array.init res47: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) //最前列追加 scala> -1+:test_array res51: Array[Int] = Array(-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //最後尾追加 scala> test_array :+ 11 res49: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11) //内容指定による要素除去 scala> val remove_test_array = test_array.filter{_!= 5} remove_test_array: Array[Int] = Array(0, 1, 2, 3, 4, 6, 7, 8, 9, 10)
2. build.sbtでjarのライブラリを追加する書式
(build.sbtと同一フォルダにcustom_libフォルダ作成してjarを入れる)
//build.sbt unmanagedBase := baseDirectory.value / "custom_lib"
3. build.sbtのspark対応
scalaのバージョン番号をsparkのライブラリ末尾に入れる必要がある。
インストールしたsparkのパスを通してから、
sparkのバージョンとmarvenのリポジトリに対応した記載にすること。
Maven Repository: spark
//build.sbt scalaVersion := "2.11.8" val sparkVersion = "2.3.0" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % sparkVersion libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % sparkVersion
適宜追加予定
tensorflowとcupyのwindows環境インストール(cuDNN、CUDA)(Linuxのみ2018/1/21時点の情報)
追記:WindowsはCUDA9.1未対応。Windowsは下記参照
cuDNN7でpython3.6とCUDA9.1に対応。インストールが簡単になった。
1. CUDAを下記から落としてインストール
CUDA (現時点では9.1が最新)
https://developer.nvidia.com/cuda-downloads
2. cuDNNを下記から落とす。
(7以上のwindows版を選択。CUDAのバージョンに揃える)
cuDNN
https://developer.nvidia.com/rdp/form/cudnn-download-survey
3. cuDNNを展開し下記フォルダに入れる
(末尾の数値はバージョンに合わせて適宜変更)
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1
4. 下記サイトでVisual C++ Build Tools 2015をインストール
Download the Visual C++ Build Tools (standalone C++ compiler, libraries and tools)
cuDNNはVS2017には対応していないので注意
5. 環境変数に下記を設定
(CUDAの後の番号はバージョンに応じて適宜変更)
INCLUDE
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1\include
C:\Program Files (x86)\Windows Kits\10\Include\10.0.10240.0\ucrt
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1\bin
PATH
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\bin
6. pip install
pip install cupy
pip install tensorflow-gpu
RDkitのインストールとPycharm用の設定
tensorflowとcupyのwindows環境インストール(cuDNN、CUDA)(2017/10/27時点の情報)
1. CUDAを下記から落としてインストール
CUDA (9はwindowsで動かないので8台を選択) https://developer.nvidia.com/cuda-downloads
2. cuDNNを下記から落とす。
(6以上のwindows版を選択)
cuDNN
https://developer.nvidia.com/rdp/form/cudnn-download-survey
3. cuDNNを展開し下記フォルダに入れる
(末尾の数値はバージョンに合わせて適宜変更)
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0
4. 下記サイトでVisual C++ Build Tools 2015をインストール
Download the Visual C++ Build Tools (standalone C++ compiler, libraries and tools)
cuDNNはVS2017には対応していないので注意
5. 環境変数に下記を設定
(CUDAの後の番号はバージョンに応じて適宜変更)
INCLUDE
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0\include
C:\Program Files (x86)\Windows Kits\10\Include\10.0.10240.0\ucrt
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0\bin
PATH
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\bin
6 Pythonを3.5.2 にダウングレード
Anaconda Prompt を起動してconda install python=3.5.2 と入力。
tensorflowが3.6に対応していないため。
7. pip install
pip install cupy
pip install tensorflow-gpu
まあ、Ubuntuが楽ですね、という話でした。