たくさん寝太郎の寝床

料理とITと皿回しが好きなオタクのブログ

Pythonで並列処理をしてみた話

こんばんは、たくさん寝太郎です。

研究で扱ってるデータの処理に時間がかかるので並列処理を試してみました。
扱っているデータに関しては以前触れているので、詳しく知りたい方はそちらの記事を参照してください。

np.loadtxtが遅いのでpd.read_tableを使ったら速くなった話 - たくさん寝太郎の寝床


やりたいこと

D_n: イベント毎の波形データの集合  \in D

例えば d_1 \in D_nは以下のようになっています。
この場合だと d_nは(10,2)行列です。

0.000 0.035034179688
10.00 0.011789055135
20.00 0.001481085258
30.00 0.014500648051
40.00 0.068899594247
50.00 0.088201983041
60.00 0.068874968657
70.00 0.144857865647
80.00 0.186440028392
90.00 0.220357148713


任意のd \in D_nの二列目成分を取ってきて1つの行列にまとめます。

d_matrix = []
for d in D:
    d_matrix.append(d[:,1])

d_matrix = np.array(d_matrix)

この行列にまとめる作業を並列処理により高速化出来ないかと考えました。

joblibで並列処理

以前の記事では処理時間が50秒ほどから20秒ほどになりました。
|D|=10くらいであればforループで D_nを順に1つずつ処理しても高々数分で終わりますが、|D|が大きくなってくるとforループでは少し処理時間が遅く感じます。

そこで、以下の記事を参照してjoblibで並列処理を行ってみました。
qiita.com

path = 'データが入ってるディレクトリ(D)'
day_list = sorted(os.listdir(path))   #これの要素がD_n

def soundevent_data(day):
    sd_data = []
    sound_path = path+day+'/'
    sound_list = sorted(os.listdir(path+day))

    for d in sound_list:
        events = pd.read_table(sound_path+"{}".format(d), delimiter=' ', header=None, names=['frequency', 'dB'])
        sd_data.append(events['dB'])

    X = np.array(sd_data).astype('float16')

    with open('dfs/{}.cmp'.format(day), 'wb') as f:
        joblib.dump((X), f, compress=3)


if __name__=="__main__":
    Parallel(n_jobs=-1)([delayed(soundevent_data)(day) for day in day_list])

実行時間は以下のようになりました。
今回の実験では |D|=16でしたが、かなり時間が短縮されていることがわかります。

# 従来法
real	0m43.192s
user	0m40.702s
sys	0m5.594s


# 並列処理
real	0m7.608s
user	1m3.185s
sys	0m11.903s


めでたしめでたし



と言いたいところですが、「これってデータの順番保持されてんのかな」と気になりました。私の研究ではイベントの順番が重要なので、d_nを行列としてまとめる時に順番が異なってしまうと大変困ります。

試しに今までの方法と出力を比較してみると案の定イベントの順番が異なっていました。前半は同じですが、後半でイベントの順番が異なっています。
(上が正しい場合、下が並列処理した場合)

f:id:kaworu_mk6:20210108172330p:plain


「イベントにid付けて最後にソートすれば何とかならんかな」と考え、少し調べてみると既に同じようなことを考えている人がいました。

qiita.com

そこで、以下のようにコードを変更しイベントにidを付けてソートするように変更してみました。

def soundevent_data(day):
    sd_data = []
    sound_path = path+day+'/'
    sound_list = sorted(os.listdir(path+day))

    for idx, d in enumerate(sound_list):
        events = pd.read_table(sound_path+"{}".format(d), delimiter=' ', header=None, names=['frequency', 'dB'])
        sd_data.append((idx, events['dB'])) #イベントとidを対応付ける

    sd_data.sort(key=lambda x: x[0]) #idでソート
    sd_data = [sd[1] for sd in sd_data]
    X = np.array(sd_data).astype('float16')

    with open('dfs/{}.cmp'.format(day), 'wb') as f:
        joblib.dump((X), f, compress=3)

データの順番が正しく保持されているかを確認したところ、無事従来法と同じように並んでいました。
青線が正しい場合で、橙線が並列処理を行った場合です。

f:id:kaworu_mk6:20210108173921p:plain


最後に処理時間の比較を行いました。
idでソートしている分少し処理時間が増えるのかなと思いましたが、ほぼ変わらない結果となりました。

# 従来法
real	0m59.524s
user	0m54.511s
sys	0m6.096s


# 並列処理(イベント順を考慮しない)
real	0m14.040s
user	1m15.081s
sys	0m12.103s


# 並列処理(イベント順を考慮する)
real	0m14.473s
user	1m15.515s
sys	0m12.057s

おわり