tata色々な備忘録

データ解析、画像解析、化学分析などなど

ScalaでSparkのDataframe②

ScalaでSpark Dataframe。
たまにメモリ解放しないと謎挙動示すので注意。
irisを使用

//CSVの読み込み。OptintionのInferSchema tureで型を自動認識する
var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(filePass)
+-----------+----------+-----------+----------+-----------+
|SepalLength|SepalWidth|PetalLength|PetalWidth|       Name|
+-----------+----------+-----------+----------+-----------+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|
+-----------+----------+-----------+----------+-----------+

//列名の取得
var fileName =new File(filePass).getName.replace(".csv","")
//列名の置換
var df = df.withColumnRenamed(df.columns(1),fileName)
+-----------+----+----+----------+-----------+
|SepalLength|iris|iris|PetalWidth|       Name|
+-----------+----+----+----------+-----------+
|        5.1| 3.5| 1.4|       0.2|Iris-setosa|
|        4.9| 3.0| 1.4|       0.2|Iris-setosa|
|        4.7| 3.2| 1.3|       0.2|Iris-setosa|
+-----------+----+----+----------+-----------+
//列の除去
df = df.drop("SepalLength")
+----+-----------+----------+-----------+
|iris|PetalLength|PetalWidth|       Name|
+----+-----------+----------+-----------+
| 3.5|        1.4|       0.2|Iris-setosa|
| 3.0|        1.4|       0.2|Iris-setosa|
| 3.2|        1.3|       0.2|Iris-setosa|
+----+-----------+----------+-----------+
//列の文字列結合
df_connect = df.withColumn("idx",concat_ws("-",$"PetalLength" ,$"PetalWidth"))
+----+-----------+----------+-----------+-------+
|iris|PetalLength|PetalWidth|       Name|    idx|
+----+-----------+----------+-----------+-------+
| 3.5|        1.4|       0.2|Iris-setosa|1.4-0.2|
| 3.0|        1.4|       0.2|Iris-setosa|1.4-0.2|
| 3.2|        1.3|       0.2|Iris-setosa|1.3-0.2|
+----+-----------+----------+-----------+-------+
//列の四則演算
val df_connect2 = df.withColumn("idx2",$"PetalLength"*100+$"PetalWidth")

+----+-----------+----------+-----------+-----+
|iris|PetalLength|PetalWidth|       Name| idx2|
+----+-----------+----------+-----------+-----+
| 3.5|        1.4|       0.2|Iris-setosa|140.2|
| 3.0|        1.4|       0.2|Iris-setosa|140.2|
| 3.2|        1.3|       0.2|Iris-setosa|130.2|
+----+-----------+----------+-----------+-----+
//列抽出
val df1 = df.select("iris","PetalLength","PetalWidth")
+----+-----------+----------+
|iris|PetalLength|PetalWidth|
+----+-----------+----------+
| 3.5|        1.4|       0.2|
| 3.0|        1.4|       0.2|
| 3.2|        1.3|       0.2|
+----+-----------+----------+
val df2 = df.select("iris","PetalLength","name")

//キーを使用したdataframeの結合
//キーはSeq内に記載。複数可能。
//結合法は、"inner", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti"がある。
//結合法の詳細は参考を参照
val join_df = df1.join(df2,Seq("iris","PetalLength"),joinType="outer")
+----+-----------+----------+---------------+
|iris|PetalLength|PetalWidth|           Name|
+----+-----------+----------+---------------+
| 2.7|        4.1|       1.0|Iris-versicolor|
| 2.9|        4.5|       1.5|Iris-versicolor|
| 2.5|        3.9|       1.1|Iris-versicolor|
+----+-----------+----------+---------------+

参考

scala - What are the various join types in Spark? - Stack Overflow

ScalaでSparkのDataframe(一部Dataset)

列の追加に癖がある。
単純に列指定で追加するのが難しいようだ。

import org.apache.spark.sql.SparkSession

//Dataset 化
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").master("local").getOrCreate()
import spark.implicits._
val data = (0 to 10).toDS()
data.show()
//result
+-----+
|value|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+
//フィルター
data.filter($"value" >5).show()
//result
+-----+
|value|
+-----+
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+
//DataSetからRDDへ
val rdd = data.rdd

//RDDのプリント
print(rdd.collect())
//RDDのプリント②(行表示:非推奨)
rdd.collect().foreach(println)
//Arrayへ
val base = data.collect()
//2倍してArrayに戻す
 val array_maltipled = data.map(_ *2 ).collect()
//5足してArrayへ
 val array_plus5 = data.map(_ +5 ).collect()

//複数列Dataframe作成
//型の定義関数
case class Row(i: Double, j: Double, k: Double)
//組合せの配列へ変換
val xs = Array(base,array_maltipled ,array_plus5 ).transpose
//型の定義関数を利用してRDDを作成
val rdd = sc.parallelize(xs).map(ys => Row(ys(0), ys(1), ys(2)))
//Dataframeへ変換
val df = rdd.toDF("base","maltipled2","plus5")
//結果
+----+----------+-----+
|base|maltipled2|plus5|
+----+----------+-----+
| 0.0|       0.0|  5.0|
| 1.0|       2.0|  6.0|
| 2.0|       4.0|  7.0|
| 3.0|       6.0|  8.0|
| 4.0|       8.0|  9.0|
+----+----------+-----+

//カラム名の変更
 val newname =Seq("b","m","p")
 val name_change = df.toDF(newname: _*)
//result
+---+---+---+
|  b|  m|  p|
+---+---+---+
|0.0|0.0|5.0|
|1.0|2.0|6.0|
|2.0|4.0|7.0|
|3.0|6.0|8.0|
|4.0|8.0|9.0|
+---+---+---+
//インデックスの追加
import org.apache.spark.sql.functions._
val data_id = data_name.withColumn("id",monotonicallyIncreasingId)
//result
+---+---+---+---+
|  b|  m|  p| id|
+---+---+---+---+
|0.0|0.0|5.0|  0|
|1.0|2.0|6.0|  1|
|2.0|4.0|7.0|  2|
|3.0|6.0|8.0|  3|
|4.0|8.0|9.0|  4|
+---+---+---+---+
//カラム名の取得
data_id.columns

//列の追加
//idを振ってidで結合
val data2_id = data2.withColumn("id",monotonicallyIncreasingId)
val data_con2 =data_id.join(data2_id,"id")
//result
+---+-------+-----+
| id|numbers|value|
+---+-------+-----+
|  0|      0|    0|
|  1|      1|    0|
|  2|      2|    0|
|  3|      3|    1|

//列選択とArray化
data_con2.select($"value").collect()

参考

how to create DataFrame from multiple arrays in Spark Scala? - Stack Overflow
scala - Append a column to Data Frame in Apache Spark 1.3 - Stack Overflow
Join Two DataFrames without a Duplicated Column — Databricks Documentation

ScalaでSpark

ScalaでSpark。pythonの方が多いが気にしない

適宜追加予定

//CSV読み込み
scala> val df = spark.read .format("csv").option("header", "true").option("mode", "DROPMALFORMED").option("inferSchema","True").load("iris.csv")

// 各列の型表示
scala> df.printSchema()

//表示(デフォルトは20行)
scala> df.show()

//SQL用の仮テーブル名設定
scala> df.createGlobalTempView("iris")

//SQLによる列の四則演算(テーブル名必要、.show()で結果出力)
scala> spark.sql("SELECT SepalLength+SepalWidth FROM global_temp.iris2").show()

//列名の取得と列名スライス
scala> val df_columns = df.columns.slice(1,3)
df_sep: Array[String] = Array(SepalWidth, PetalLength)

//列方向のスライス(列番号スライス⇨抽出データの取得)
scala> val df_columns = df.columns.slice(1,4)
df_columns: Array[String] = Array(SepalWidth, PetalLength, PetalWidth)
scala> df.select(df_columns.head,df_columns.tail:_*).show(3)
+----------+-----------+----------+
|SepalWidth|PetalLength|PetalWidth|
+----------+-----------+----------+
|       3.5|        1.4|       0.2|
|       3.0|        1.4|       0.2|
|       3.2|        1.3|       0.2|
+----------+-----------+----------+

//行方向のスライス
import org.apache.spark.sql.functions._ 
scala> val idxDf = df.withColumn("idx", monotonicallyIncreasingId())
scala>  idxDf.show(3)
+-----------+----------+-----------+----------+-----------+---+
|SepalLength|SepalWidth|PetalLength|PetalWidth|       Name|idx|
+-----------+----------+-----------+----------+-----------+---+
|        5.1|       3.5|        1.4|       0.2|Iris-setosa|  0|
|        4.9|       3.0|        1.4|       0.2|Iris-setosa|  1|
|        4.7|       3.2|        1.3|       0.2|Iris-setosa|  2|
+-----------+----------+-----------+----------+-----------+---+

scala> val ex_Df1 = idxDf.filter("idx > 10")
scala> val ex_Df = ex_Df1.filter("idx < 15")
scala> ex_Df.show()
+-----------+----------+-----------+----------+-----------+---+
|SepalLength|SepalWidth|PetalLength|PetalWidth|       Name|idx|
+-----------+----------+-----------+----------+-----------+---+
|        4.8|       3.4|        1.6|       0.2|Iris-setosa| 11|
|        4.8|       3.0|        1.4|       0.1|Iris-setosa| 12|
|        4.3|       3.0|        1.1|       0.1|Iris-setosa| 13|
|        5.8|       4.0|        1.2|       0.2|Iris-setosa| 14|
+-----------+----------+-----------+----------+-----------+---+

参考

python - Splitting DataFrames in Apache Spark - Stack Overflow

scala - Get a range of columns of Spark RDD - Stack Overflow

scala覚え書き

位置参照(スライス)に関して追記

1. ListやArrayの操作基本

scala> val test_array = (0 to 10).toArray
test_array: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

//範囲抽出
scala> test_array.slice(3,7)
res38: Array[Int] = Array(3, 4, 5, 6)

//位置参照
scala> test_array(5)
res52: Int = 5

//先頭取り出し
scala> test_array.head
res45: Int = 0

//最後尾取り出し
scala> test_array.last
res48: Int = 10

//先頭除去
scala> val tail_array = test_array.tail
tail_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

//反転
scala> test_array.reverse
res46: Array[Int] = Array(10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0)

//最後尾の削除
scala> val temp = test_array.init
res47: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

//最前列追加
scala> -1+:test_array
res51: Array[Int] = Array(-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

//最後尾追加
scala> test_array :+ 11
res49: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

//内容指定による要素除去
scala> val remove_test_array = test_array.filter{_!= 5}
remove_test_array: Array[Int] = Array(0, 1, 2, 3, 4, 6, 7, 8, 9, 10)

2. build.sbtでjarのライブラリを追加する書式
(build.sbtと同一フォルダにcustom_libフォルダ作成してjarを入れる)

//build.sbt
unmanagedBase := baseDirectory.value / "custom_lib"

3. build.sbtのspark対応
scalaのバージョン番号をsparkのライブラリ末尾に入れる必要がある。
インストールしたsparkのパスを通してから、
sparkのバージョンとmarvenのリポジトリに対応した記載にすること。
Maven Repository: spark

//build.sbt
scalaVersion := "2.11.8"
val sparkVersion = "2.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % sparkVersion
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % sparkVersion

適宜追加予定

tensorflowとcupyのwindows環境インストール(cuDNN、CUDA)(Linuxのみ2018/1/21時点の情報)

追記:WindowsはCUDA9.1未対応。Windowsは下記参照

tatabox.hatenablog.com

cuDNN7でpython3.6とCUDA9.1に対応。インストールが簡単になった。

1. CUDAを下記から落としてインストール

CUDA (現時点では9.1が最新)

https://developer.nvidia.com/cuda-downloads

2. cuDNNを下記から落とす。

(7以上のwindows版を選択。CUDAのバージョンに揃える)
cuDNN

https://developer.nvidia.com/rdp/form/cudnn-download-survey

3. cuDNNを展開し下記フォルダに入れる

(末尾の数値はバージョンに合わせて適宜変更)
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1

4. 下記サイトでVisual C++ Build Tools 2015をインストール

Download the Visual C++ Build Tools (standalone C++ compiler, libraries and tools)

cuDNNはVS2017には対応していないので注意

5. 環境変数に下記を設定

(CUDAの後の番号はバージョンに応じて適宜変更)
INCLUDE
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1\include
C:\Program Files (x86)\Windows Kits\10\Include\10.0.10240.0\ucrt
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.1\bin
PATH
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\bin

6. pip install

pip install cupy
pip install tensorflow-gpu

RDkitのインストールとPycharm用の設定

ケムイオンフォマティクス用のツールキット、
RDkitのインストールとPycharmの設定に苦労したのでメモ

公式の通りにインストールするとimport Errorで苦しむので注意

(自己紹介)

1. Anacondaをインストール

www.anaconda.com

2. Anacondaプロンプトに下記のコマンドを入力

$ conda install -c rdkit rdkit 

3. Pycharmのワーキングディレクトリ変更

run⇒Edit configuration ⇒working directory
C:\ProgramData\Anaconda3
に設定

参考

Rdkit :: Anaconda Cloud

以上

tensorflowとcupyのwindows環境インストール(cuDNN、CUDA)(2017/10/27時点の情報)

1. CUDAを下記から落としてインストール

CUDA (9はwindowsで動かないので8台を選択) https://developer.nvidia.com/cuda-downloads

2. cuDNNを下記から落とす。

(6以上のwindows版を選択)
cuDNN

https://developer.nvidia.com/rdp/form/cudnn-download-survey

3. cuDNNを展開し下記フォルダに入れる

(末尾の数値はバージョンに合わせて適宜変更)
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0

4. 下記サイトでVisual C++ Build Tools 2015をインストール

Download the Visual C++ Build Tools (standalone C++ compiler, libraries and tools)

cuDNNはVS2017には対応していないので注意

5. 環境変数に下記を設定

(CUDAの後の番号はバージョンに応じて適宜変更)
INCLUDE
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0\include
C:\Program Files (x86)\Windows Kits\10\Include\10.0.10240.0\ucrt
C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0\bin
PATH
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\bin

6 Pythonを3.5.2 にダウングレード

Anaconda Prompt を起動してconda install python=3.5.2 と入力。
tensorflowが3.6に対応していないため。

7. pip install

pip install cupy
pip install tensorflow-gpu

まあ、Ubuntuが楽ですね、という話でした。