Data Processing
Import Required Libraries
# Keras for deep learning
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.layers import Bidirectional
from keras.models import Sequential
# Scikit-learn for metrics
from sklearn.metrics import mean_squared_error
# Additional libraries
import time
import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pdNormalize Data
def load_data(filename, sequence_length):
raw_data = pd.read_csv(filename, dtype=float).values
# Replace zeros with previous values
for x in range(0, raw_data.shape[0]):
for y in range(0, raw_data.shape[1]):
if raw_data[x][y] == 0:
raw_data[x][y] = raw_data[x-1][y]
# Convert to 3D array
data = raw_data.tolist()
result = []
for index in range(len(data) - sequence_length):
result.append(data[index: index + sequence_length])
# Normalize
d0 = np.array(result)
dr = np.zeros_like(d0)
dr[:,1:,:] = d0[:,1:,:] / d0[:,0:1,:] - 1
# Split into training (90%) and testing (10%)
split_line = round(0.9 * dr.shape[0])
training_data = dr[:int(split_line), :]
np.random.shuffle(training_data)
X_train = training_data[:, :-1]
Y_train = training_data[:, -1][:, 20]
X_test = dr[int(split_line):, :-1]
Y_test = dr[int(split_line):, 49, :][:, 20]
return X_train, Y_train, X_test, Y_testModel Building
Initialize a 3-Layer Bidirectional RNN
def initialize_model(window_size, dropout_value=0.2, activation_function='linear', loss_function='mse', optimizer='adam'):
model = Sequential()
# Layer 1
model.add(Bidirectional(LSTM(window_size, return_sequences=True), input_shape=(window_size, X_train.shape[-1])))
model.add(Dropout(dropout_value))
# Layer 2
model.add(Bidirectional(LSTM((window_size*2), return_sequences=True)))
model.add(Dropout(dropout_value))
# Layer 3
model.add(Bidirectional(LSTM(window_size, return_sequences=False)))
# Output Layer
model.add(Dense(units=1))
model.add(Activation(activation_function))
model.compile(loss=loss_function, optimizer=optimizer)
return modelTraining & Testing
Train the Model
def fit_model(model, X_train, Y_train, batch_num=1024, num_epoch=100, val_split=0.1):
start = time.time()
model.fit(X_train, Y_train, batch_size=batch_num, epochs=num_epoch, validation_split=val_split)
training_time = int(math.floor(time.time() - start))
return model, training_timeTest the Model
def test_model(model, X_test, Y_test, unnormalized_bases):
y_predict = model.predict(X_test)
# Reverse normalization
real_y_test = [(y+1)*unnormalized_bases[i] for i, y in enumerate(Y_test)]
real_y_predict = [(predict+1)*unnormalized_bases[i] for i, predict in enumerate(y_predict)]
# Plot results
plt.figure(figsize=(10,5))
plt.title("Bitcoin Price Over Time")
plt.plot(real_y_predict, color='green', label='Predicted Price')
plt.plot(real_y_test, color='red', label='Real Price')
plt.ylabel("Price (USD)")
plt.xlabel("Time (Days)")
plt.legend()
plt.show()
return y_predict, real_y_test, real_y_predictPerformance Analysis
Price Change Visualization
def price_change(Y_daybefore, Y_test, y_predict):
delta_predict = (y_predict - Y_daybefore) / (1+Y_daybefore)
delta_real = (Y_test - Y_daybefore) / (1+Y_daybefore)
plt.figure(figsize=(10,6))
plt.title("Percent Change in Bitcoin Price Per Day")
plt.plot(delta_predict, color='green', label='Predicted Percent Change')
plt.plot(delta_real, color='red', label='Real Percent Change')
plt.ylabel("Percent Change")
plt.xlabel("Time (Days)")
plt.legend()
plt.show()Model Metrics
- Precision: 0.62
- Recall: 0.55
- F1 Score: 0.58
- MSE: 0.043
FAQ
Q1: Why use a bidirectional RNN?
A: Bidirectional RNNs capture context from both past and future data points, improving accuracy for time-series predictions like cryptocurrency prices.
Q2: How is data normalized?
A: Each window’s values are divided by the first value, then 1 is subtracted to scale the data between -1 and 1.
Q3: What’s the ideal batch size?
A: A batch size of 1024 balances computational efficiency and model performance for this dataset.