Pythonで並列処理をしてみた話
こんばんは、たくさん寝太郎です。
研究で扱ってるデータの処理に時間がかかるので並列処理を試してみました。
扱っているデータに関しては以前触れているので、詳しく知りたい方はそちらの記事を参照してください。
np.loadtxtが遅いのでpd.read_tableを使ったら速くなった話 - たくさん寝太郎の寝床
やりたいこと
: イベント毎の波形データの集合
例えばは以下のようになっています。
この場合だとは(10,2)行列です。
0.000 0.035034179688 10.00 0.011789055135 20.00 0.001481085258 30.00 0.014500648051 40.00 0.068899594247 50.00 0.088201983041 60.00 0.068874968657 70.00 0.144857865647 80.00 0.186440028392 90.00 0.220357148713
任意のの二列目成分を取ってきて1つの行列にまとめます。
d_matrix = [] for d in D: d_matrix.append(d[:,1]) d_matrix = np.array(d_matrix)
この行列にまとめる作業を並列処理により高速化出来ないかと考えました。
joblibで並列処理
以前の記事では処理時間が50秒ほどから20秒ほどになりました。
くらいであればforループでを順に1つずつ処理しても高々数分で終わりますが、が大きくなってくるとforループでは少し処理時間が遅く感じます。
そこで、以下の記事を参照してjoblibで並列処理を行ってみました。
qiita.com
path = 'データが入ってるディレクトリ(D)' day_list = sorted(os.listdir(path)) #これの要素がD_n def soundevent_data(day): sd_data = [] sound_path = path+day+'/' sound_list = sorted(os.listdir(path+day)) for d in sound_list: events = pd.read_table(sound_path+"{}".format(d), delimiter=' ', header=None, names=['frequency', 'dB']) sd_data.append(events['dB']) X = np.array(sd_data).astype('float16') with open('dfs/{}.cmp'.format(day), 'wb') as f: joblib.dump((X), f, compress=3) if __name__=="__main__": Parallel(n_jobs=-1)([delayed(soundevent_data)(day) for day in day_list])
実行時間は以下のようになりました。
今回の実験ではでしたが、かなり時間が短縮されていることがわかります。
# 従来法 real 0m43.192s user 0m40.702s sys 0m5.594s # 並列処理 real 0m7.608s user 1m3.185s sys 0m11.903s
めでたしめでたし
と言いたいところですが、「これってデータの順番保持されてんのかな」と気になりました。私の研究ではイベントの順番が重要なので、を行列としてまとめる時に順番が異なってしまうと大変困ります。
試しに今までの方法と出力を比較してみると案の定イベントの順番が異なっていました。前半は同じですが、後半でイベントの順番が異なっています。
(上が正しい場合、下が並列処理した場合)
「イベントにid付けて最後にソートすれば何とかならんかな」と考え、少し調べてみると既に同じようなことを考えている人がいました。
そこで、以下のようにコードを変更しイベントにidを付けてソートするように変更してみました。
def soundevent_data(day): sd_data = [] sound_path = path+day+'/' sound_list = sorted(os.listdir(path+day)) for idx, d in enumerate(sound_list): events = pd.read_table(sound_path+"{}".format(d), delimiter=' ', header=None, names=['frequency', 'dB']) sd_data.append((idx, events['dB'])) #イベントとidを対応付ける sd_data.sort(key=lambda x: x[0]) #idでソート sd_data = [sd[1] for sd in sd_data] X = np.array(sd_data).astype('float16') with open('dfs/{}.cmp'.format(day), 'wb') as f: joblib.dump((X), f, compress=3)
データの順番が正しく保持されているかを確認したところ、無事従来法と同じように並んでいました。
青線が正しい場合で、橙線が並列処理を行った場合です。
最後に処理時間の比較を行いました。
idでソートしている分少し処理時間が増えるのかなと思いましたが、ほぼ変わらない結果となりました。
# 従来法 real 0m59.524s user 0m54.511s sys 0m6.096s # 並列処理(イベント順を考慮しない) real 0m14.040s user 1m15.081s sys 0m12.103s # 並列処理(イベント順を考慮する) real 0m14.473s user 1m15.515s sys 0m12.057s
おわり