SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for double_color_ball
-- ----------------------------
DROP TABLE IF EXISTS `double_color_ball`;
CREATE TABLE `double_color_ball` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`issur_code` varchar(55) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`occur_date` date NULL DEFAULT NULL,
`red_ball` varchar(55) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`blue_ball` varchar(55) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`recommend` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
`create_date` datetime(0) NULL DEFAULT NULL,
`update_date` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3049 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
Pymysql 工具类
'''
import pymysql;
# 获取链接对象、游标对象
def get_connection_cursor():
connect = pymysql.connect(host="127.0.0.1", port=3306, database="kg20",
user="root", password="root", charset="utf8mb4");
# 从链接对象中获取游标对象
cursor = connect.cursor();
return connect, cursor;
def execute_edit(cursor, sql):
return cursor.execute(sql);
def execute_query(cursor, sql):
cursor.execute(sql);
return cursor.fetchall();
def commit_(connect):
connect.commit();
def rollback_(connect):
connect.rollback();
def close_(connect, cursor):
if cursor:
cursor.close();
if connect:
connect.close();
def execute_edit_(sql):
result = None;
connect, cursor = None, None;
try:
connect, cursor = get_connection_cursor();
result = execute_edit(cursor, sql);
commit_(connect);
except BaseException as e:
print(e);
rollback_(connect);
finally:
close_(connect, cursor);
return result;
def execute_query_(sql):
result = None;
connect, cursor = None, None;
try:
connect, cursor = get_connection_cursor();
result = execute_query(cursor, sql);
except BaseException as e:
print(e);
finally:
close_(connect, cursor);
return result;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
sqlalchemy util
'''
# 将项目根目录添加到 sys.path,解决 cmd 下执行该模块找不到包的问题
import sys, os;
current_path = os.path.abspath(os.path.dirname(__file__));
separator = "\\" if os.name == "nt" else "/";
project_name = "python_spider" + separator;
root_path = current_path[:current_path.find(project_name) + len(project_name)]; # 获取项目根目录
sys.path.append(root_path);
from sqlalchemy import Column, String, Text, text, Integer, BigInteger, Float, Date, DateTime, ForeignKey, create_engine, and_, or_;
from sqlalchemy.orm import sessionmaker, relationship;
from sqlalchemy.ext.declarative import declarative_base;
# 创建对象基类
Base = declarative_base();
# 初始化数据库引擎
def init_db_engine():
engine = None;
try:
engine = create_engine("mysql+mysqlconnector://root:root@localhost:3306/kg20?auth_plugin=mysql_native_password");
except Exception as e:
print("数据库连接失败,异常:%s" % e);
return engine;
# 创建数据库表
def init_db():
engine = init_db_engine();
if engine:
Base.metadata.create_all(engine, checkfirst=True);
# 初始化 DB Session
def init_db_session():
engine = init_db_engine();
if engine:
session = sessionmaker(bind=engine);
return session();
else:
return None;
# 新增
def insert_(entity, key):
session = init_db_session();
if session:
try:
# select * from coronavirus where date == ?
'''
根据实体 bean 的某个字段作为唯一标识,查询数据库是否已经存在,不存在则插入新的数据
构造 sql(列举): select * from coronavirus where date == ?
'''
results = session.query(type(entity)).filter(
type(entity).__dict__.get(key) == entity.__dict__.get(key)
).all();
if len(results) == 0:
session.add(entity)
session.commit();
except Exception as e:
print(e);
session.rollback();
finally:
session.close();
# 修改
def update_(entity, key):
d = entity.__dict__;
d.pop("_sa_instance_state");
session = init_db_session();
if session:
try:
session.query(type(entity)).filter(
type(entity).__dict__.get(key) == entity.__dict__.get(key)
).update(d);
session.commit();
except Exception as e:
print(e);
session.rollback();
finally:
session.close();
# 删除
def delete_(entity, key):
session = init_db_session();
if session:
try:
session.query(type(entity)).filter(
type(entity).__dict__.get(key) == entity.__dict__.get(key)
).delete();
session.commit();
except Exception as e:
print(e);
session.rollback();
finally:
session.close();
# 查询所有
def get_all(entity):
results = None;
session = init_db_session();
if session:
results = session.query(type(entity)).all();
session.close();
return results;
# 查询单个
def get_one(entity, key):
result = None;
session = init_db_session();
if session:
result = session.query(type(entity)).filter(
type(entity).__dict__.get(key) == entity.__dict__.get(key)
).first();
session.close();
return result;
# 原生 sql
def execute_(sql):
results = None;
session = init_db_session();
if session:
if sql.lower().startswith("select"):
'''
- 高版本在执行 sql 时候,需加上 text(sql) 函数,否则抛出一下异常
- Textual SQL expression '***' should be explicitly declared as text('***')
'''
results = session.execute(text(sql)).fetchall();
else:
results = session.execute(text(sql));
session.commit();
session.close();
return results;
if __name__ == '__main__':
pass
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
LSTM Model
'''
import math;
import numpy as np;
import pandas as pd;
from pandas import DataFrame;
from keras.models import Sequential;
from keras.layers import LSTM, Dense, Activation;
from sklearn.preprocessing import MinMaxScaler;
from sklearn.metrics import mean_squared_error;
import matplotlib.pyplot as plt;
class LSTM_Model(object):
'''
data:numpy 一维数组,[1 2 3...]
step_length:步长,用步长数据预测下一个数据
'''
def __init__(self, data, step_length=1):
print("构造 LSTM_Model,data 类型 %s,data 形状 %s, 步长 %s" % (type(data), data.shape, step_length));
self.data = data;
self.step_length = step_length;
self.scaler = MinMaxScaler(feature_range=(0, 1));
self.train_data = None;
self.train_x = None;
self.train_y = None;
self.test_data = None;
self.test_x = None;
self.test_y = None;
self.predict = None;
# 设置 numpy 数组打印格式化
np.set_printoptions(linewidth=400, threshold=6);
# 初始化训练数据和测试数据
def init_train_test_data(self):
print("==== 初始化训练数据和测试数据 ====");
print("原始数据:%s" % self.data);
# 将值转化为 float 类型
data = self.data.astype(float);
# 使用 MinMaxScaler 进行数据归一化,参数需要二维结构 [[5][1][4]...[4][1][4]]
# 归一化完成后,将二维结构变回一维结构
data = self.scaler.fit_transform(data.reshape(-1, 1)).flatten();
print("归一化数据:%s" % data);
# 数据 4/5 作为训练数据,1/5 作为测试数据
print("---- 数据拆分 ----");
train_length = int(len(data) * 0.8);
# 二维结构拆分,并返回一维结构
# train_data, test_data = data[0:train_length, :].flatten(), \
# data[train_length:len(data), :].flatten();
# 一维结构拆分
self.train_data, self.test_data = data[0:train_length], data[train_length:len(data)];
print("训练数据: %s" % self.train_data);
print("测试数据: %s" % self.test_data);
'''
构造模型适应数据
data:numpy 一维数组 [ 5. 1. 4. ... 1. 4. 10.]
step_length:步长,用步长数据预测下一个数据
1 -> 1:[[5][1][4]...[4][1][4]] ---- [ 1 4 8 ... 1 4 10]
2 -> 1:[[5 1][1 4][4 8]...[4 4][4 1][1 4]] ---- [ 4 8 3 ... 1 4 10]
3 -> 1:[[5 1 4][1 4 8][4 8 3]...[1 4 4][4 4 1][4 1 4]] ---- [ 8 3 12 ... 1 4 10]
'''
def build_fit_data(self, data, data_name=""):
print("==== 构造%s模型适应数据 ====" % data_name);
data_x, data_y = [], [];
for i in range(len(data) - self.step_length):
data_x.append(data[i: i + self.step_length]);
data_y.append(data[i + self.step_length]);
x, y = np.asarray(data_x), np.asarray(data_y);
# x 重构 shape (nb_samples, timesteps, input_dim)
x = x.reshape(len(x), self.step_length, 1);
# y 重构 shape
y = y.reshape(len(y), 1);
print("%s_x 数据:%s" % (data_name, x));
print("%s_y 数据:%s" % (data_name, y));
return x, y;
'''
时间步长 LSTM 回归模型
'''
def time_step_model(self):
# 隐藏神经元
hidden_neurons = 50;
# 输入输出神经元
in_out_neurons = 1;
print("==== 构造 Sequential 模型 ====");
model = Sequential();
'''
units:LSTM 单元内的隐藏层尺寸,理论上这个 units 的值越大, 网络越复杂, 精度更高,计算量更大;
input_shape:三维尺寸,模型需要知道它所期望的输入的尺寸,顺序模型中的第一层且只有第一层
需要接收关于其输入尺寸的信息,下面的层可以自动地推断尺寸;
input_shape=(batch_dim, time_dim, feat_dim)
input_shape=(time_dim, feat_dim)
Batch_size:比较好的方法是将 Batch_size 设置为 None
Time_step:时间序列的长度
Input_Sizes:每个时间点输入 x 的维度
activation:激活函数 relu、linear 等,也可以单独添加激活层实现 model.add(Activation("linear"));
'''
print("---- 添加 LSTM 层,该层有 %d 个隐藏神经元,relu 激活函数, 输入样本形状为 %s ----" %
(hidden_neurons, self.train_x.shape));
# model.add(LSTM(hidden_neurons, return_sequences=False,
# input_shape=(self.train_x.shape[1], self.train_x.shape[2])));
model.add(LSTM(hidden_neurons, activation='relu', input_shape=(self.train_x.shape[1], self.train_x.shape[2])));
print("---- 添加 Dense 层,该层有 %d 个输入输出神经元 ----" % (in_out_neurons,));
# 全连接层
model.add(Dense(in_out_neurons));
print("---- 编译模型 ----");
model.compile(loss="mean_squared_error", optimizer="rmsprop");
print("---- 输出摘要 ----");
model.summary();
print("---- 使用训练数据训练模型 ----");
model.fit(self.train_x, self.train_y, epochs=10, validation_split=0.05);
print("---- 用模型对测试数据进行预测 ----");
predict = model.predict(self.test_x).reshape(len(self.test_y));
print("预测数据:%s" % (predict,));
print("---- 数据反归一化 ----");
# 使用 MinMaxScaler 进行数据反归一化,参数需要二维结构 [[5][1][4]...[4][1][4]]
# 归一化完成后,将二维结构变回一维结构
self.predict = self.scaler.inverse_transform(predict.reshape(-1, 1)).flatten();
self.test_y = self.scaler.inverse_transform(self.test_y).flatten();
print("test_y 反归一化:%s" % (self.test_y, ));
print("预测数据反归一化:%s" % (self.predict, ));
print("---- 计算测试数据与预测数据 RMSE 误差 ----");
# 计算均方误差回归损失
MSE = np.mean((self.predict - self.test_y) ** 2);
MSE = mean_squared_error(self.test_y, self.predict);
print('MSE:%.2f' % MSE);
score = math.sqrt(MSE);
print('Score:%.2f' % score);
# 输出图表
def data_graph(self):
# 指定字体,可解决中文乱码问题
plt.rcParams['font.sans-serif'] = ['SimHei'];
# '行','列','编号' ---- 2 行, 第一行列数 1, 图表编号 1
plt.subplot(2, 1, 1);
plt.plot(self.data, label="原始数据", color="black", linewidth=1);
plt.title("所有数据");
plt.xlabel("日期");
plt.ylabel("数值");
plt.subplot(2, 1, 2);
plt.plot(self.test_y, label="原始数据", color="black", linewidth=1);
plt.plot(self.predict, label="原始数据", color="red", linewidth=1);
plt.title("测试数据 & 预测数据");
plt.xlabel("日期");
plt.ylabel("数值");
plt.show();
# 应用入口
def application(self):
self.init_train_test_data();
self.train_x, self.train_y = self.build_fit_data(self.train_data, "训练");
self.test_x, self.test_y = self.build_fit_data(self.test_data, "测试");
self.time_step_model();
self.data_graph();
if __name__ == '__main__':
df = pd.read_csv("/temp/twocolorball.csv", encoding="gbk").drop(labels="Unnamed: 0", axis=1);
df = df.sort_values(by="期号", ascending=True);
data = np.asarray(df["红球1"]);
# data = np.array(list(range(1, 101)));
lstm = LSTM_Model(data=data, step_length=3);
lstm.application();
print(int(round(lstm.predict[-1])));
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
双色球实体bean
'''
from util.sqlalchemy_util import *;
class Double_Color_Ball(Base):
# 指定表名
__tablename__ = "double_color_ball";
# 创建表的参数
__table_args__ = {
"mysql_charset": "utf8"
};
id = Column(Integer, primary_key=True, autoincrement=True, nullable=False);
issur_code = Column(String(255));
occur_date = Column(Date);
red_ball = Column(String(255));
blue_ball = Column(String(255));
recommend = Column(Text(500));
create_date = Column(DateTime);
update_date = Column(DateTime);
if __name__ == '__main__':
init_db();
# dcb = Double_Color_Ball(
# issur_code="2023111",
# occur_date="2023-08-29 14:14:14",
# red_ball="1,2,3,6,13,31",
# blue_ball="7",
# recommend="1,2,3,14,23,33 11",
# create_date="2023-08-29 14:14:14",
# update_date="2023-08-29 14:14:14"
# )
# insert_(dcb, "issur_code");
# result = get_all(dcb);
# print(result);
# for item in result:
# print(item.__dict__);
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
双色球爬虫
http://kaijiang.zhcw.com/zhcw/html/ssq/list.html
'''
import requests;
from bs4 import BeautifulSoup;
import re;
from entity.Double_Color_Ball import *;
from datetime import datetime;
import time;
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0",
"Cookie":"Hm_lvt_692bd5f9c07d3ebd0063062fb0d7622f=1691204419; _ga_9FDP3NWFMS=GS1.1.1691204417.2.1.1691204437.0.0.0; _ga=GA1.2.812812942.1688107107; Hm_lvt_12e4883fd1649d006e3ae22a39f97330=1691204418"
}
headers_sfac={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/116.0",
"Token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIxIiwicm9sZSI6IkFkbWluIiwidXNlckltYWdlIjoiaHR0cDovL3d3dy5zZmFjLnh5ejo4MDAwL2ltYWdlcy9wcm9maWxlLzE2ODg3MTE5MjI0NDkuanBnIiwiaWQiOjEsImV4cCI6MTY5MzM2NzMxNiwidXNlck5hbWUiOiJhZG1pbiIsImlhdCI6MTY5MzI4MDkxNn0.ND2M1Iu19w2x0V_-TLTeBjF1jk80HQYdBAkT2fsEMeM"
}
# 获取单页数据
def get_dcb_page_data(url):
print("单页数据:%s" % url);
# 向目标网址发送请求,获得响应
r = requests.get(url, headers=headers);
# 根据响应状态做处理
if r.status_code == 200:
# 设置响应编码
r.encoding = r.apparent_encoding;
# 打印响应文本内容
# print(r.text);
# 解析响应的内容
# 构造 bs 对象
bs = BeautifulSoup(markup=r.text, features="html.parser");
# 获取所有 tr 标签
tr_list = bs.find_all(name="tr");
for index, tr in enumerate(tr_list):
if index == 0 or index == 1 or index == (len(tr_list) - 1):
continue;
# print(tr);
# 构造 DCB 对象
dcb = Double_Color_Ball(
issur_code=re.findall('<td align="center">(.*?)</td>', str(tr))[1],
occur_date=re.findall('<td align="center">(.*?)</td>', str(tr))[0],
red_ball=" ".join(re.findall('<em class="rr">(.*?)</em>', str(tr))),
blue_ball=re.findall('<em>(.*?)</em>', str(tr))[0],
create_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
update_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
);
# print(dcb.__dict__);
# 插入数据库
insert_(dcb, "issur_code");
# 获取所有数据
def get_dcb_all_data(page_size=153):
# 生成链接列表
urls = list("http://kaijiang.zhcw.com/zhcw/html/ssq/list_%d.html" % page for page in range(2, page_size + 1));
urls.insert(0, "http://kaijiang.zhcw.com/zhcw/html/ssq/list.html");
for url in urls:
get_dcb_page_data(url);
time.sleep(5);
return True;
# 获得 sfac 单页数据
def get_dcb_page_data_sfac(url, currentPage=1, pageSize=10):
print("单页数据:%s" % url);
search = {"currentPage":currentPage,"pageSize":pageSize,"keyword":"","sort":"issue_no","direction":"desc"};
# 向目标网址发送请求,获得响应
r = requests.post(url, json=search, headers=headers_sfac);
# 根据响应状态做处理
if r.status_code == 200:
# 设置响应编码
r.encoding = r.apparent_encoding;
print(r.json().get("total"));
for item in r.json().get("list", []):
# 构造 DCB 对象
dcb = Double_Color_Ball(
issur_code=item.get("issueNo"),
occur_date=item.get("awardDate"),
red_ball=item.get("redBall"),
blue_ball=item.get("blueBall"),
create_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
update_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
);
print(dcb.__dict__);
insert_(dcb, "issur_code");
# 获取 sfac 所有数据
def get_dcb_all_data_sfac(page_size=31):
url_sfac = "http://538b537e25.zicp.vip:13595/api/economy/bicolorSpheres";
page_size = 100;
for page in range(1, page_size + 1):
get_dcb_page_data_sfac(url_sfac, page, page_size);
time.sleep(3);
if __name__ == '__main__':
# url = "http://kaijiang.zhcw.com/zhcw/html/ssq/list.html";
# get_dcb_page_data(url);
# get_dcb_all_data();
# url_sfac= "http://538b537e25.zicp.vip:13595/api/economy/bicolorSpheres";
# get_dcb_page_data_sfac(url_sfac);
get_dcb_all_data_sfac();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
双色球数据处理
'''
from util.sqlalchemy_util import *;
import numpy as np;
import pandas as pd;
from pandas import Series, DataFrame;
import random;
from pyecharts.faker import Faker;
from pyecharts import options as opts;
from pyecharts.charts import Bar, Bar3D, Line, Line3D, Pie, Map, Geo, Funnel, Grid, Tab, Page;
from statsmodels.tsa.api import SimpleExpSmoothing, Holt;
from util.LSTM_Model import *;
# 获取所有数据,构造 dateframe 对象
def build_dcb_df():
print("======== 初始化df数据 ========");
# 从数据库读取数据
sql = "select issur_code, occur_date, red_ball, blue_ball from double_color_ball order by occur_date";
result = execute_(sql);
column_list = ["期号", "开奖日期", "红球", "蓝球"];
df = DataFrame(data=result, columns=column_list);
df = pd.concat([
df[["期号", "开奖日期"]],
df["红球"].str.split(" ", expand=True).rename(
columns={0: '红球1', 1: '红球2', 2: '红球3', 3: '红球4', 4: '红球5', 5: '红球6'}),
df["蓝球"],
], axis=1);
print(df);
return df;
# 绘制最近30期中奖号码走势图
def draw_dcb_lines(df):
print("======== 绘制走势图 ========");
# 准备数据
occure_date = np.asarray(df["开奖日期"][-30:].apply(lambda item:str(item))).tolist();
red1 = np.asarray(df["红球1"][-30:].apply(lambda item:int(item))).tolist();
red2 = np.asarray(df["红球2"][-30:].apply(lambda item:int(item))).tolist();
red3 = np.asarray(df["红球3"][-30:].apply(lambda item:int(item))).tolist();
red4 = np.asarray(df["红球4"][-30:].apply(lambda item:int(item))).tolist();
red5 = np.asarray(df["红球5"][-30:].apply(lambda item:int(item))).tolist();
red6 = np.asarray(df["红球6"][-30:].apply(lambda item:int(item))).tolist();
blue = np.asarray(df["蓝球"][-30:].apply(lambda item:int(item))).tolist();
Line().add_xaxis(
xaxis_data=occure_date
).add_yaxis(
series_name="红1",
y_axis=red1,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="红2",
y_axis=red2,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="红3",
y_axis=red3,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="红4",
y_axis=red4,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="红5",
y_axis=red5,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="红6",
y_axis=red6,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).add_yaxis(
series_name="蓝球",
y_axis=blue,
itemstyle_opts=opts.ItemStyleOpts(color=Faker.rand_color())
).set_global_opts(
title_opts=opts.TitleOpts(title="双色球走势图", subtitle="近30期", pos_left="10%"),
# 设置 series_name 位置
legend_opts=opts.LegendOpts(pos_left="40%"),
).render(
path="/projectCode/clazz/wish_web/static/html/dcb.html"
);
'''
- 添加号码到号码池
- l:号码池
- nimber:号码
- is_blue:是否为蓝球
'''
def add_number_to_pool(l, number, is_blue):
max = 16 if is_blue else 33;
# 蓝球直接添加
if is_blue:
l.append(number);
return l;
elif (not is_blue) and (not l.__contains__(number)):
l.append(number);
l.sort();
return l;
else:
add_number_to_pool(l, random.randint(1, max), is_blue);
# 随机生成双色球
def random_dcb_predict():
print("======== 随机算法预测 ========");
l = [];
for i in range(1, 8):
is_blue = False if i < 7 else True;
max = 16 if is_blue else 33;
add_number_to_pool(l, random.randint(1, max), is_blue);
print(l);
return l;
# 平滑曲线预测
def smoothing_dcb_predict(df):
print("======== 平滑指数预测 ========");
l = [];
for i in range(1, 8):
column = "红球%d" % i if i < 7 else "蓝球";
is_blue = False if i < 7 else True;
ses = SimpleExpSmoothing(
np.asarray(df[column].apply(lambda item:int(item)))
).fit(
smoothing_level=random.randint(1, 10) / 10,
optimized=False
);
# 用适应模型预测数据,返回数组 [3.98919522]
result = ses.predict();
munber = int(round(result[0], 0));
# print(result, munber);
add_number_to_pool(l, munber, is_blue);
print(l);
return l;
# holt 预测
def holt_dcb_predict(df):
print("======== Holt预测 ========");
l = [];
for i in range(1, 8):
column = "红球%d" % i if i < 7 else "蓝球";
is_blue = False if i < 7 else True;
holt = Holt(
np.asarray(df[column].apply(lambda item: int(item)))
).fit(
smoothing_level=random.randint(1, 10) / 10,
smoothing_trend = random.randint(1, 10) / 10,
optimized = False
);
# 用适应模型预测数据,返回数组 [3.98919522]
result = holt.predict();
munber = int(round(result[0], 0));
add_number_to_pool(l, munber, is_blue);
print(l);
return l;
# lstm 预测
def lstm_dcb_predict(df):
print("======== LSTM 预测 ========");
l = [];
for i in range(1, 8):
column = "红球%d" % i if i < 7 else "蓝球";
is_blue = False if i < 7 else True;
lstm = LSTM_Model(data=np.asarray(df[column].apply(lambda item: int(item))), step_length=3);
lstm.application();
munber = int(round(lstm.predict[-1]));
add_number_to_pool(l, munber, is_blue);
print(l);
return l;
def save_predict_data(result=""):
print("======== 保存预测结果 ========");
print(result);
# 查找数据库最近一条数据
query_sql = "select id from double_color_ball order by occur_date desc limit 1";
ids = execute_(query_sql);
id = 0 if len(ids) == 0 else ids[0][0];
# 修改推荐号
update_sql = "update double_color_ball set recommend='%s' where id=%s;" % (result, id);
execute_(update_sql);
# 调度程序
def application():
result = [];
df = build_dcb_df();
draw_dcb_lines(df);
l = random_dcb_predict();
result.append("<p>随机预测:%s</p>" % l);
l = smoothing_dcb_predict(df);
result.append("<p>平滑指数预测:%s</p>" % l);
l = holt_dcb_predict(df);
result.append("<p>Holt 预测:%s</p>" % l);
l = lstm_dcb_predict(df);
result.append("<p>LSTM 预测:%s</p>" % l);
save_predict_data("".join(result));
return "".join(result);
if __name__ == '__main__':
# df = build_dcb_df();
# draw_dcb_lines(df);
# l = random_dcb_predict();
# print(l);
# l = smoothing_dcb_predict(df);
# print(l);
# l = holt_dcb_predict(df);
# print(l);
# l = lstm_dcb_predict(df);
# print(l);
# save_predict_data("aaaaa");
application();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "JiangHu";
'''
- Spider 模块 models
'''
from django.db import models;
# 注意,此处命名不能和 Wish_Spider 里面的对象命名一致
class DoubleColorBall(models.Model):
id = models.AutoField(primary_key=True);
issur_code=models.CharField(max_length=255, blank = True, null = True);
occur_date = models.DateField(blank=True, null=True);
red_ball=models.CharField(max_length=255, blank = True, null = True);
blue_ball=models.CharField(max_length=255, blank = True, null = True);
recommend=models.TextField(max_length=500, blank = True, null = True);
create_date=models.DateTimeField(auto_now=True, blank=True, null=True);
update_date=models.DateTimeField(auto_now=True, blank=True, null=True);
# 将 class 转 dict,方便接口返回数据
def dcb_dict(self):
dcb_dict = {};
dcb_dict["id"] = self.id;
dcb_dict["issurCode"] = self.issur_code;
dcb_dict["occurDate"] = self.occur_date;
dcb_dict["redBall"] = self.red_ball;
dcb_dict["blueBall"] = self.blue_ball;
dcb_dict["recommend"] = self.recommend;
dcb_dict["createDate"] = self.create_date;
dcb_dict["updateDate"] = self.update_date;
return dcb_dict;
# 指定表名,若不指定,默认生成表名为:app名称_类名,比如gzbd_epidemic
class Meta:
db_table = ('double_color_ball');
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "JiangHu";
'''
- page vo 对象
'''
class Result(object):
def __init__(self, status, message, data=None):
self.status = status;
self.message = message;
self.data = data;
def result(self):
if self.data:
return {"status": self.status, "message": self.message, "data": self.data};
else:
return {"status": self.status, "message": self.message};
class Search_Vo(object):
def __init__(self, data):
self.current_page = data.get("currentPage", 1);
self.page_size = data.get("pageSize", 5);
self.sort = data.get("sort", "");
if self.sort == "":
self.sort = "id";
self.direction = data.get("direction", "asc");
if self.direction.lower() == "desc":
self.sort = "-" + self.sort;
self.keyword = data.get("keyword", "");
def result(self):
search = {};
search["currentPage"] = self.current_page;
search["pageSize"] = self.page_size;
search["sort"] = self.sort;
search["direction"] = self.direction;
search["keyword"] = self.keyword;
return search;
class Page_Info(object):
def __init__(self, total=0, current_page=1, page_size=5, list=[]):
self.total = total;
self.current_page = current_page;
self.page_size = page_size;
self.list = list;
def result(self):
page_info_dict = {}
page_info_dict["total"] = self.total;
page_info_dict["currentPage"] = self.current_page;
page_info_dict["pageSize"] = self.page_size;
page_info_dict["list"] = self.list;
return page_info_dict;
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "JiangHu";
'''
- Spider 模块 service
'''
# 将其他项目根目录添加到 sys.path
import sys, os;
sys.path.append(r'D:\projectCode\clazz\wish_spider');
from django.shortcuts import render, HttpResponse;
from django.http import JsonResponse;
import json;
from app_spider.models import *;
from app_common.page_vo import *;
from django.db.models import Q;
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage;
from spider.dcb_spider import *;
from data_process.dcb_data_process import *;
# 返回字符串
def hello_world(request):
return HttpResponse("Hello World!");
'''
- 插入 dcb | 修改 dcb
- insert_dcb | dcb | json | post | Result ----- /api/economy/dcb
- update_dcb | dcb | json | put | Result ----- /api/economy/dcb
'''
def edit_dcb(request):
if request.method == "POST":
# 获取 json 参数 dcb
dcb_dict = json.loads(request.body);
# 构造 dcb 对象
dcb = DoubleColorBall(
issur_code=dcb_dict.get("issurCode", ""),
occur_date=dcb_dict.get("occurDate", ""),
red_ball=dcb_dict.get("redBall", ""),
blue_ball=dcb_dict.get("blueBall", ""),
recommend=dcb_dict.get("recommend", ""),
);
# 数据库查询该期数据是否已经存在
temp = DoubleColorBall.objects.filter(issur_code=dcb.issur_code).first();
if temp:
return JsonResponse(Result(500, "该期数据已经存在。").result());
else:
dcb.save();
return JsonResponse(Result(200, "插入成功。", dcb.dcb_dict()).result());
elif request.method == "PUT":
# 获取 json 参数 dcb
dcb_dict = json.loads(request.body);
# 根据 id 获取数据库 dcb 对象
dcb = DoubleColorBall.objects.get(id=dcb_dict.get("id"));
dcb.issur_code = dcb_dict.get("issurCode", "");
dcb.occur_date = dcb_dict.get("occurDate", "");
dcb.red_ball = dcb_dict.get("redBall", "");
dcb.blue_ball = dcb_dict.get("blueBall", "");
dcb.recommend = dcb_dict.get("recommend", "");
# 数据库查询该期数据是否已经存在
temp = DoubleColorBall.objects.filter(issur_code=dcb.issur_code).first();
if temp and temp.id != dcb.id:
return JsonResponse(Result(500, "该期数据已经存在。").result());
else:
dcb.save();
return JsonResponse(Result(200, "修改成功。", dcb.dcb_dict()).result());
else:
return JsonResponse(Result(500, "不支持该请求类型。").result());
'''
- delete dcb | get dcb
- delete_dcb_by_id | id | path | Result ---- /api/economy/dcb/1
- get_dcb_by_id | id | path | dcb ------ /api/economy/dcb/1
'''
def delete_get_dcb(request, id):
if request.method == "DELETE":
DoubleColorBall.objects.filter(id=id).delete();
return JsonResponse(Result(200, "删除成功。").result());
elif request.method == "GET":
dcb = DoubleColorBall.objects.filter(id=id).filter();
if dcb:
return JsonResponse(dcb.dcb_dict());
else:
return JsonResponse({});
else:
return JsonResponse(Result(500, "不支持该请求类型。").result());
'''
- dcbs 分页查询接口
- get_dcbs_by_search | Search | json | page_info ----- /api/economy/dcbs
'''
def get_dcbs_by_search(request):
if request.method == "POST":
# 获取 json 参数 dcb
search_dict = json.loads(request.body);
search_vo = Search_Vo(search_dict);
'''
- 根据 keyword && order by 查询所有的对象
- 多个字段模糊查询,Q(字段名1__icontains=keyword) | Q(字段名2__icontains=keyword)
- icontains、contains 区别:是否大小写敏感
'''
dcbs = DoubleColorBall.objects.filter(
Q(issur_code__icontains=search_vo.keyword) |
Q(red_ball__icontains=search_vo.keyword) |
Q(blue_ball__icontains=search_vo.keyword)
).order_by(search_vo.sort);
# 初始化 Paginator 对象,用 Paginator 对象进行分页
paginator = Paginator(dcbs, search_vo.page_size);
try:
dcbs = paginator.page(search_vo.current_page);
except PageNotAnInteger:
search_vo.current_page = 1;
dcbs = paginator.page(1);
except EmptyPage:
search_vo.current_page = paginator.num_pages;
dcbs = paginator.page(paginator.num_pages);
# 返回 Page_Info 的 dict
dcb_list = list(dcb.dcb_dict() for dcb in dcbs.object_list);
return JsonResponse(
Page_Info(
total=paginator.count,
current_page=search_vo.current_page,
page_size=search_vo.page_size,
list=dcb_list
).result());
else:
return JsonResponse(Result(500, "不支持该请求类型。").result());
# 测试页面
def hello_world_page(requset):
content = {};
content["name"] = "hj";
content["age"] = 18;
return render(requset, "spider/helloWorld.html", content);
# dcbs 页面
def get_dcbs_page(requset):
content = {};
return render(requset, "spider/dcbs.html", content);
# 调用爬虫
def get_recent_dcbs_data(request):
if request.method == "GET":
print("==================");
result = get_dcb_all_data(page_size=1);
return JsonResponse(Result(200, "Success.", result).result());
# 调用数据预测、图表
def get_dcb_predict(request):
if request.method == "GET":
result = application();
return JsonResponse(Result(200, "Success.", result).result());
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "JiangHu";
"""
urls
"""
from django.contrib import admin;
from django.urls import path, re_path;
from app_account import views as av;
from app_common import views as cv;
from app_spider import views as sv;
urlpatterns = [
path('admin/', admin.site.urls),
# ======== account ========
# ======== common ========
# ======== spider ========
re_path(r'^helloworld$', sv.hello_world), # 返回字符串
re_path(r'^api/spider/dcb$', sv.edit_dcb), # 返回Json
re_path(r'^api/spider/dcb/(\d+)$', sv.delete_get_dcb),# 返回Json
re_path(r'^api/spider/dcbs$', sv.get_dcbs_by_search),# 返回Json
re_path(r'^spider/helloWorld$', sv.hello_world_page),# 测试页面
re_path(r'^spider/dcbs$', sv.get_dcbs_page),# dcbs页面
re_path(r'^api/spider/recent/dcbs$', sv.get_recent_dcbs_data),# 爬取数据
re_path(r'^api/spider/dcb/predict$', sv.get_dcb_predict),# 数据预测
]