【Python】機械学習用の時系列データを生成してみる

内容

機械学習の勉強をするためには学習・推論するためのデータが必要となります
今回はnumpyのsin関数を使って正常波形と異常波形の時系列データを生成してみます

さっそくコードを書いてみる

モジュールをインポートします

import numpy as np
import matplotlib.pyplot as plt

テストデータを生成してみます

#変数をいろいろ設定
sampling_frequency=100000 #サンプリング周波数
sampling_length=5000 #サンプリング点数
data_freq_main=1000 #データの主周波数
data_learn_num=500 #学習用生成データ数
data_test_num=50 #テスト用生成データ数
data_freq_abnomal=20000 #異常データ重畳周波数
data_amp_abnomal_1=0.05 #異常データ振幅レベル1
data_amp_abnomal_2=0.1 #異常データ振幅レベル2

#データ用配列を定義
data_nomal_learn=[] #学習用正常データ
data_nomal_test=[] #テスト用正常データ
data_abnomal_noise1=[] #高周波ノイズ レベル1
data_abnomal_noise2=[] #高周波ノイズ レベル2
data_abnomal_amp1=[] #振幅増加 レベル1
data_abnomal_amp2=[] #振幅増加 レベル2
data_abnomal_naa1=[] #高周波ノイズ + 振幅増加 レベル1
data_abnomal_naa2=[] #高周波ノイズ + 振幅増加 レベル2

#時間データ生成
timeline = np.arange(sampling_length)/sampling_frequency

#学習用データ生成
for i in range(data_learn_num):
    phase = np.random.rand()*2*np.pi
    nomal = np.sin(2*np.pi*data_freq_main*timeline + phase)
    white_noise = ((np.random.rand(len(timeline))-0.5) * 0.2)
    data_nomal_learn.append( nomal + white_noise)

#テスト用データ生成
for i in range(data_test_num):
    phase=np.random.rand()*2*np.pi
    phase_noise=np.random.rand()*2*np.pi
    nomal=np.sin(2*np.pi*data_freq_main*timeline + phase)
    white_noise = ((np.random.rand(len(timeline))-0.5) * 0.2)
    abnomal_noise = np.sin(2*np.pi*data_freq_abnomal*timeline + phase_noise)
    data_nomal = nomal + white_noise
    data_nomal_test.append(data_nomal)
    data_abnomal_noise1.append( data_nomal + abnomal_noise * data_amp_abnomal_1 ) 
    data_abnomal_noise2.append( data_nomal + abnomal_noise * data_amp_abnomal_2 ) 
    data_abnomal_amp1.append( data_nomal * (1+data_amp_abnomal_1))
    data_abnomal_amp2.append( data_nomal * (1+data_amp_abnomal_2))
    data_abnomal_naa1.append( data_nomal * (1+data_amp_abnomal_1) + abnomal_noise * data_amp_abnomal_1 )
    data_abnomal_naa2.append( data_nomal * (1+data_amp_abnomal_2) + abnomal_noise * data_amp_abnomal_2 ) 

生成データの確認をしてみます

plt.style.use(['default'])
plt.figure(figsize=(12,3))
plt.subplot(1,3,1)
plt.plot(timeline, data_nomal_learn[0])
plt.xlabel("time [sec]")
plt.ylabel("data")
plt.subplot(1,3,2)
plt.plot(timeline[:100], data_nomal_learn[0][:100])
plt.subplot(1,3,3)
plt.plot(timeline[:100], data_nomal_test[0][:100],label='nomal')
plt.plot(timeline[:100], data_abnomal_noise2[0][:100],label='noise')
plt.plot(timeline[:100], data_abnomal_amp2[0][:100],label='amp')
plt.legend()
plt.show()

生成結果

f:id:tech-foo:20220215200337p:plain
左が正常波形の全体データ、中央が正常波形の一部データ、右が正常波形と異常波形の一部データです。
なんとなく意図通りに生成できていることが確認できました



改良版

#変数をいろいろ設定
sampling_frequency = 100000 #サンプリング周波数
sampling_length = 5000 #サンプリング点数
data_train_num = 500 #学習用生成データ数
data_test_num = 50 #テスト用生成データ数

freq_nomal_low = 1000 #データの主周波数
freq_nomal_high = 15000 #正常データ重畳周波数
freq_abnomal = 11000 #異常データ重畳周波数
freq_shift_1 = 14000 
freq_shift_2 = 13000 

amp_nomal_low = 1.0 #正常データ振幅レベル
amp_nomal_high = 0.2 #正常データ振幅レベル
amp_white_noise = 0.2 #正常データ振幅レベル
amp_abnomal_noise_1 = 0.1 #異常データ振幅レベル1
amp_abnomal_noise_2 = 0.2 #異常データ振幅レベル2
amp_abnomal_level_1 = 0.5 #異常データ振幅レベル1
amp_abnomal_level_2 = 1.0 #異常データ振幅レベル2

#データ用配列を定義
data_raw_train=[] #学習用正常データ
data_raw_test={}
data_raw_test['nomal']=[] #テスト用正常データ
data_raw_test['noise_1']=[] #高周波ノイズ レベル1
data_raw_test['noise_2']=[] #高周波ノイズ レベル2
data_raw_test['amp_1']=[] #振幅増加 レベル1
data_raw_test['amp_2']=[] #振幅増加 レベル2
data_raw_test['noise_amp_1']=[] #高周波ノイズ + 振幅増加 レベル1
data_raw_test['noise_amp_2']=[] #高周波ノイズ + 振幅増加 レベル2
data_raw_test['high_shift_1']=[] #高周波ノイズ + 振幅増加 レベル1
data_raw_test['high_shift_2']=[] #高周波ノイズ + 振幅増加 レベル2

#時間データ生成
timeline = np.arange(sampling_length)/sampling_frequency

def func_make_sin(frequency):
    phase = 2 * np.pi * np.random.rand()
    return np.sin(2 * np.pi * frequency * timeline + phase)

#学習用データ生成
for i in range(data_train_num):
    nomal_low = func_make_sin(freq_nomal_low) * amp_nomal_low
    nomal_high = func_make_sin(freq_nomal_high) * amp_nomal_high
    white_noise = (np.random.rand(len(timeline))-0.5) * 2 * amp_white_noise
    data_raw_train.append(nomal_low + nomal_high + white_noise) 

#テスト用データ生成
for i in range(data_test_num):
    nomal_low = func_make_sin(freq_nomal_low) * amp_nomal_low
    nomal_high = func_make_sin(freq_nomal_high) * amp_nomal_high
    white_noise = (np.random.rand(len(timeline))-0.5) * 2 * amp_white_noise
    nomal = nomal_low + nomal_high + white_noise
    abnomal_amp_1 = nomal_high * amp_abnomal_level_1
    abnomal_amp_2 = nomal_high * amp_abnomal_level_2
    abnomal_noise_1 = func_make_sin(freq_abnomal) * amp_abnomal_noise_1
    abnomal_noise_2 = func_make_sin(freq_abnomal) * amp_abnomal_noise_2
    abnomal_shift_1 = nomal_low + func_make_sin(freq_shift_1) * amp_nomal_high + white_noise
    abnomal_shift_2 = nomal_low + func_make_sin(freq_shift_2) * amp_nomal_high + white_noise
    
    data_raw_test['nomal'].append(nomal) 
    data_raw_test['noise_1'].append( nomal + abnomal_noise_1 ) 
    data_raw_test['noise_2'].append( nomal + abnomal_noise_2 ) 
    data_raw_test['amp_1'].append( nomal + abnomal_amp_1 ) 
    data_raw_test['amp_2'].append( nomal + abnomal_amp_2 ) 
    data_raw_test['noise_amp_1'].append( nomal + abnomal_amp_1 + abnomal_noise_1 ) 
    data_raw_test['noise_amp_2'].append( nomal + abnomal_amp_2 + abnomal_noise_2 ) 
    data_raw_test['high_shift_1'].append(abnomal_shift_1) 
    data_raw_test['high_shift_2'].append(abnomal_shift_2) 

#生成データの確認
plt.style.use(['default'])
plt.figure(figsize=(12,3))
plt.subplot(1,3,1)
plt.plot(timeline, data_raw_train[0])
plt.xlabel("time [sec]")
plt.ylabel("data")
plt.subplot(1,3,2)
plt.plot(timeline[:100], data_raw_train[0][:100])
plt.subplot(1,3,3)
plt.plot(timeline[:100], data_raw_test['nomal'][0][:100],label='nomal')
plt.plot(timeline[:100], data_raw_test['noise_2'][0][:100],label='noise')
plt.plot(timeline[:100], data_raw_test['amp_2'][0][:100],label='amp')
plt.legend()
plt.show()

WSL2のメモリ使用量を制限する

WSL2はPCメモリを制限なく使用して枯渇させる恐れがある

WSL2のメモリ指定方法

ユーザーディレクトリ(例:C:\Users\foo)に.wslconfigを下記内容で作成する
作成後はPCを再起動する

.wslconfig

[wsl2]
memory=6GB
swap=0

memoryのGB数は適宜変更する

メモリ指定確認方法

WSL2で下記コマンドを実行

free -h

はてなブログで2個目のブログを新規作成

サブアカウントを追加

Myはてなの設定タブでサブアカウントを追加する

追加にはメールアドレスが必要

既にあるブログと完全に独立したブログになる
(本ブログから特定されない 逆も然り)

Googleアナリティクスに追加

Googleアナリティクスを開く → 管理 → プロパティを作成 

→ プロパティ名を適当に入力 → タイムゾーンで日本を選択

→ 詳細オプションを表示 → サイトURLを入力 → 次へ

→ 目的などを適当に選択 → UAから始まるトラッキングIDをコピー

→ はてなブログの詳細設定で「Google Analytics 埋め込み」にペースト
  (ページ下の「変更する」を忘れずにクリック)

Googleサーチコンソールに追加

Googleサーチコンソールを開く → プロパティを追加

→ URLプレフィックスでURLを入力

→ 所有権の確認で「その他の確認方法」の「HTMLタグ」を選択

→ メタタグ(contentsの ” ” 内の文字列)をコピー

→ はてなブログの詳細設定で「Google Search Console 」にペースト
  (ページ下の「変更する」を忘れずにクリック)

→ Googleサーチコンソールで「確認」をクリック

 

メモ

# %%
#入力用データに変換
DATA_MAX=1.5
DATA_MIN=-1.5
def func_make_input(source):
    input_train=np.array(source)
    input_train=(input_train-DATA_MIN)/(DATA_MAX-DATA_MIN)
    input_train=np.clip(input_train,0.0,1.0)
    print(input_train.shape)
    return np.reshape(input_train, (input_train.shape[0], input_train.shape[1], 1))
train_X = func_make_input(data_raw_train)
train_Y = train_X

print(train_X.shape)
print(train_Y.shape)

plt.style.use(['default'])
plt.figure(figsize=(12,3))
plt.subplot(1,2,1)
plt.plot(train_X[0])
plt.subplot(1,2,2)
plt.plot(train_X[0][:100])
plt.show()

# %%
model = keras.models.Sequential()
# model.add(keras.layers.LSTM(300, activation='relu', return_sequences=True, input_shape=(LEN_INPUT, 1)))
# model.add(keras.layers.LSTM(300, activation='relu'))
# model.add(keras.layers.Dense(25, activation='softmax'))
# model.compile(loss='mse', optimizer='adam')
model.add(keras.layers.Conv1D(10, 10, padding='same', input_shape=(train_X.shape[1], 1), activation='relu'))
model.add(keras.layers.MaxPooling1D(2, padding='same'))
model.add(keras.layers.Conv1D(10, 10, padding='same', activation='relu'))
model.add(keras.layers.MaxPooling1D(2, padding='same'))
model.add(keras.layers.Conv1D(10, 10, padding='same', activation='relu'))
model.add(keras.layers.UpSampling1D(2))
model.add(keras.layers.Conv1D(10, 10, padding='same', activation='relu'))
model.add(keras.layers.UpSampling1D(2))
model.add(keras.layers.Conv1D(1, 10, padding='same', activation='sigmoid'))
model.compile(loss='mse', optimizer='adam')
print(model.summary())

 

# %%
# EPOCHS=30
# #学習
# history = model.fit(train_X, train_Y, validation_split=0.1, epochs=EPOCHS)
# model.save('test.h5')
# #損失プロット
# plt.plot(range(EPOCHS), history.history['loss'], label='loss')
# plt.plot(range(EPOCHS), history.history['val_loss'], label='val_loss')
# plt.xlabel('epoch')
# plt.ylabel('loss')
# plt.legend() 
# plt.show()

model = keras.models.load_model('test.h5')

# %%

dic_mse={}
dic_output={}
for key,value in data_raw_test.items():
    output = model.predict(func_make_input(value))
    dic_output[key]=output
    list_mse=[]
    for i in range(output.shape[0]):
        list_mse.append( mean_squared_error(value[i], output[i]))
    dic_mse[key]=list_mse

plt.figure(figsize=(14,3))
plt.subplot(121)
for key,value in dic_mse.items():
    plt.plot(value,label=key)
plt.legend() 

plt.subplot(122)
plt.boxplot(dic_mse.values(),labels=dic_mse.keys())
plt.xticks(rotation=45)
plt.grid(True)
plt.show()


plt.style.use(['default'])
plt.figure(figsize=(14,5))

for k,(key,value) in enumerate(dic_output.items(),1):
    plt.subplot(2,5,k)
    for i in range(1):
        plt.plot(value[i,:100],label=key+str(i))
        input=func_make_input(data_raw_test[key])
        plt.plot(input[i][:100],label=key+str(i)+' raw')
    plt.legend()
plt.show()

 

# %%

1d-CNNとりあえずやってみる

モジュールをインポート

import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers.convolutional import Conv1D, UpSampling1D
from keras.layers.pooling import MaxPooling1D

サンプルデータ生成

LEN_DATA=10000
timeline = np.arange(LEN_DATA)
raw_data = np.sin(timeline * 1 / 100) + np.cos(timeline * 3 / 100)/2
raw_data = raw_data + (np.random.rand(len(timeline)) * 0.3)# ノイズ項
plt.style.use(['default'])
plt.plot(timeline[:5000], raw_data[:5000])
plt.xlabel("time")
plt.ylabel("data")
plt.show()

入力データに変換

LEN_INPUT=64
LEN_OUTPUT=16
LEN_TEST=5000
DATA_MAX=1.8
DATA_MIN=-1.5
input_data = []
output_data = []
raw_data=(raw_data-DATA_MIN)/(DATA_MAX-DATA_MIN)
raw_data=np.clip(raw_data,0.0,1.0)
for n in range(LEN_DATA-LEN_INPUT-LEN_OUTPUT):
    input_data.append(raw_data[n:n+LEN_INPUT])
    output_data.append(raw_data[n+LEN_INPUT:n+LEN_INPUT+LEN_OUTPUT])

input_data = np.array(input_data[:LEN_TEST])
output_data = np.array(output_data[:LEN_TEST])
print(input_data.shape)
print(output_data.shape)

train_X = np.reshape(input_data, (-1, LEN_INPUT, 1))
train_Y = np.reshape(output_data, (-1, LEN_OUTPUT, 1))
print(train_X.shape)
print(train_Y.shape)

NNモデル定義

model = Sequential()
model.add(Conv1D(64, 8, padding='same', input_shape=(64, 1), activation='relu'))
model.add(MaxPooling1D(2, padding='same'))
model.add(Conv1D(64, 8, padding='same', activation='relu'))
model.add(MaxPooling1D(2, padding='same'))
model.add(Conv1D(32, 8, padding='same', activation='relu'))
model.add(Conv1D(1, 8, padding='same', activation='tanh'))

model.compile(loss='mse', optimizer='adam')
model.summary()

学習

EPOCHS=100
#学習
history = model.fit(train_X, train_Y, validation_split=0.1, epochs=EPOCHS)
#損失プロット
plt.plot(range(EPOCHS), history.history['loss'], label='loss')
plt.plot(range(EPOCHS), history.history['val_loss'], label='val_loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend() 
plt.show()

推論テスト

start = LEN_TEST
sheed = np.reshape(raw_data[start:start+LEN_INPUT], (1, LEN_INPUT, 1))
prediction = sheed
sheed2=sheed
prediction2 = sheed2

for i in range(20):
    res = model.predict(sheed)
    sheed = np.concatenate((sheed[:, 16:, :], res), axis=1)
    prediction = np.concatenate((prediction, res), axis=1)

    res2 = model.predict(sheed2)
    sheed2 = np.reshape(raw_data[start+(i+1)*16:start+64+(i+1)*16], (1, 64, 1))
    prediction2 = np.concatenate((prediction2, res2), axis=1)

print(prediction.shape)
print(prediction2.shape)
predictor = np.reshape(prediction, (-1))
predictor2 = np.reshape(prediction2, (-1))
print(predictor.shape)
print(predictor2.shape)
plt.plot(range(len(predictor)), raw_data[start:start + len(predictor)], label='real')
plt.plot(range(len(predictor)), predictor, label='predict')
plt.plot(range(len(predictor2)), predictor2, label='predict2')
plt.legend() 
plt.show()

参考:
時系列予測を一次元畳み込みを使って解く with Keras - Qiita

ubuntuのプロキシ設定

aptのプロキシ設定

sudo vi /etc/apt/apt.conf

/etc/apt/apt.conf
Acquire::http::Proxy "http://your.proxy.address:proxy.port";
Acquire::https::Proxy "http://your.proxy.address:proxy.port";

■補足:viの操作
iで挿入モード
Escape - :wqで書き込んで終了

■補足:ファイルの確認
cat /etc/apt/apt.conf

参考:
https://zenn.dev/tfukumori/articles/281abe8d507f27

 

プロキシの環境設定

vi ~/.bashrc

export https_proxy="http://username:password@your.proxy.address:proxy.port/"
export http_proxy="http://username:password@your.proxy.address:proxy.port/"

参考:
https://qiita.com/Gushi_maru/items/5ba23d997e32abc98620

 

pipのプロキシ対応

pip install --proxy="https://your.proxy.address:proxy.port" pandas

WSL2上からWindows上のフォルダを参照する

目標

下記Cドライブ直下のディレクトリのリストをWSL2上のpythonから取得する

f:id:tech-foo:20220207235543p:plain

コード

mnt というディレクトリからローカルのCドライブが参照できる

import os
print(os.listdir(path='/mnt/c/data'))


実行結果

['new', 'old']

おまけ

\\wsl$ をWindowsエクスプローラで入力するとWSL2のフォルダが参照できる