'''
hello world
'''
# ==== input and output ====
print("Hello World");
# 遇到逗号,在拼接字符串的时候自动添加空格
print("Hello", "World");
# name = input();
# print("Hello", name);
name = "HymanHu";
print("Hello %s"%name);
first_name = "Hu";
last_name = "Hyman";
print("Hello %s%s"%(last_name, first_name));
# ==== 数据类型 ====
a = 33;
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = 33.22;
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = 3 < 2;
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = 3 + 4j;
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = "hujiang";
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = b'hujiang';
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = r'hujiang';
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = u'hujiang';
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = None;
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = ["cdsa", None, 12, 12.34, True, True];
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = ("cdsa", None, 12, 12.34, True, True);
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = {"name":"hujiang", "age":18};
print("a 的值是:%s,类型为:%s"%(a, type(a)));
a = {"name", "age", "age"};
print("a 的值是:%s,类型为:%s"%(a, type(a)));
# ==== 运算符 ====
print((100 + 22.2 - 5 * 2) / 2);
print(5 // 2); # 地板除,取整
print(5 % 2);
print(2 ** 3); # 乘方
a = True and False;
print("a的数据类型%s, 值%s"%(type(a), a));
a = True or False;
print("a的数据类型%s, 值%s"%(type(a), a));
a = not False;
print("a的数据类型%s, 值%s"%(type(a), a));
"aa" == "ss"; # 字符串比较
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HuJiang";
# ==== ASCII转换 ====
print("98-->%s;a-->%s"%(chr(98), ord('a')));
# ==== encode && decode ====
print("cdsa".encode("ascii")); # 输出b'cdsa',转化为bytes类型;
#print("中文".encode("ascii")); # 报错,ascii只定义了127个字符,中文无法解析;
print("中文".encode("utf-8")); # 输出b'\xe4\xb8\xad\xe6\x96\x87',转化为bytes类型,无法显示为ASCII字符的字节,用\x##显示;
print(b'cdsa'.decode("ascii")); # 输出cdsa,将bytes类型按照ascii解码;
print(b'\xe4\xb8\xad\xe6\x96\x87'.decode("utf-8")); # 输出中文,将bytes类型按照utf8解码;
# ==== 前缀字符串 ====
print(u'中文'); # 后面字符串是以Unicode编码
print(r'dddd'); # 后面字符串是普通字符串
print(b'qwerr'); # 后面是bytes
# ==== len ====
print(len("aaa")); # 3,对于str计算字符数
print(len("中文")); # 2,对于str计算字符数
print(len("aaa".encode("utf-8"))); # 3,对于bytes计算字节数
print(len("中文".encode("utf-8"))); # 6,对于bytes计算字节数 ---- 可见utf8中一个中文占3个字节
# ==== replace ====
a = "cdcacdsahymancdscda";
print(a.replace("hyman", "----"));
print(a);
# ==== find ====
print("cdsacdas".find("a")); # 字符串第一次出现下标,没有-1
print("cdsacdas".rfind("a")); # 字符串最后一次出现下标,没有-1
# ==== isspace ====
print(" ".isspace());
# ==== contain ====
temp = "全省累计报告新型冠状病毒肺炎确诊病例"
print(temp.__contains__("全省累计"))
# ==== 字符串格式化 ====
print("%d----%2d----%02d"%(2, 3, 4)); # 2d(不足两位左边补空格)、02d(不足两位,左边补0)
print("%f----%.2f"%(222.2, 333.345)); # .2f(保留两位小数,四舍五入)
print("%x"%333); # 格式化为十六进制
print("%s%%%s"%("3", "2"));
print(list("%s" % x for x in range(2, 10))); # 将2-10生成器,转化为字符串list
# **同学你好,在**学期,你的成绩为**,提高了**%
print("%s同学你好,在%02d学期,你的成绩为%.2f,提高了%.2f%%"%("hujiang", 2, 87.3456, 1.2345));
# 生成 http://www.sfac.xyz/page_* 列表
print(list("http://www.sfac.xyz/page_%d" % page for page in range(1, 11)));
print("Hi {0}, 成绩提高了{1:.1f}%".format("小明", 1.234));
print("Hi {0}, 成绩提高了{1}%".format("小明", "%.1f"%1.234));
print("-".join(["a", "b", "c"]));
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
import re;
# ==== match & search ====
# 从头开始匹配
ma = re.match("c", "adcda");
print(ma);
# 从 anywhere 匹配
se = re.search("c", "adcda");
print(se);
# ==== 匹配字符串 ====
email_re = "^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$";
# 如果匹配成功,将返回一个Math对象,失败则返回None
if re.match(email_re, "hujiangux@163.com"):
print("ok");
else:
print("error");
# ==== 切分字符串 ====
print("a b c".split(" "));
print(re.split(r"\s+", "a b c"));
print(re.split(r"[\s\,\;]+", "a,b;; c d"));
# ==== 分组 ====
# 分组提取电话号码
math = re.match(r"^(\d{3})-(\d{3,8})$", "010-12345");
print(math.groups()); # ('010', '12345')
print(math.group(0)); # 010-12345
print(math.group(1)); # 010
print(math.group(2)); # 12345
# 分组提起时间
math = re.match(r"^(0[0-9]|1[0-9]|2[0-3]|[0-9])\:"
r"(0[0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9]|[0-9])\:"
r"(0[0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9]|[0-9])$", "19:05:30");
print(math.groups());
# 分组提取数字
temp = "截至8月20日0时,全省累计报告新型冠状病毒肺炎确诊病例627例(其中境外输入86例)," \
"累计治愈出院609例,死亡3例,目前在院隔离治疗15例,551人尚在接受医学观察。 ";
# 前后不添加 ^$,添加后强制起始和结为,如果解析的字符串有空格,则无法进行匹配
temp_re = r'截至8月20日0时,全省累计报告新型冠状病毒肺炎确诊病例(\d+)例\(其中境外输入(\d+)例\),' \
r'累计治愈出院(\d+)例,死亡(\d+)例,目前在院隔离治疗(\d+)例,(\d+)人尚在接受医学观察。';
ma = re.match(temp_re, temp);
print(ma.groups());
print(ma.group(0));
print(ma.group(1));
# 分组提取字符
results = re.findall('<link rel="assets" href="(.*?)">', '<link rel="assets" href="https://github.githubassets.com/">');
print(results);
# 贪婪匹配
print(re.match(r"^(\d+)(0*)$", "102300").groups()); # ('102300', '')
print(re.match(r"^(\d+?)(0*)$", "102300").groups()); # ('1023', '00')
email_re = r'^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$';
email_name_re = r'<?([^@|^>]*).*@.*';
print(re.match(email_re, "hyman@163.com"));
print(re.match(email_re, "bill.gates@microsoft.com"));
print(re.match(email_re, "y_cat-st@example.com"));
print(re.match(email_name_re, "<Tom Paris> tom@voyager.org").groups());
print(re.match(email_name_re, "tom@voyager.org").groups());
# ==== list ====
l = ["hujiang", 18, True, 13.14, None];
l = list(range(10));
# 从前取值,index从0开始往后;从后取值,index从-1开始往前
print(l[0] + "----" + l[-1]);
# 集合尾部添加元素
l.append("hymanhu");
# l1.append(l2);
# 将后面集合的元素添加到前面集合,注意和append的区别,append是将append的整体作为一个元素纳入前面集合;
l += list(range(3));
# 集合指定位置插入元素
l.insert(1, "hujiang");
# 按照索引删除元素,无参数时从最后一位删除
l.pop();
l.pop(0);
# 删除首个符合条件的元素,不是索引
l.remove(5);
# 直接对集合元素进行赋值
l[0] = "hymanHu111";
print("list: %s, length: %s" % (l, len(l)));
# list 排序
l.sort(key=lambda item:item, reverse=True);
print(l);
# list 遍历
l = list(range(1, 10));
print(list(item for item in l if item > 4));
# ==== tuple ====
t = tuple(range(10));
t = ("aads", 123, True, None, 12.3);
# 定义只有一个元素的元祖,元素后追加“,”,以免误解成数学计算意义上的括号
t = ("cdsa",);
# 集合作为元祖的元素,我们可以修改集合的元素
t = ("vsv", ["aaa", "sss"]);
t[1][1] = "bbbb";
print("tuple: %s, length: %s"%(t, len(t)));
# ==== dict ====
d = {"name":"hyman","age":33,"money":22.3};
print("dict: %s, length: %s"%(d, len(d)));
# get 取值,没有返回 None,也可给定默认值
print(d.get("name"), d.get("name1"), d.get("name1", "hujiang"));
# 赋值取值
d["aaaa"] = "aaaa";
d["name"] = "hymanhu1";
print(d["name"], d["age"]);
# 删除
d.pop("money");
print("dict: %s, length: %s"%(d, len(d)));
# 循环
for key in d:
print(key, d[key]);
# 合并dict
dict_1 = {"name":"hujiang", "age":22}
dict_2 = {"sex":"man"}
print(dict(dict_1, **dict_2))
# ==== set ====
s1 = {"name", "name", "age", "money"};
s2 = set(["name", "name", "age", "money"]);
s = set(["aaa", 123, 123, True]);
s.add("fdsaa");
# 删除首个符合条件的元素
s.remove(123);
# 删除第一个元素,无参
s.pop();
print("set: %s, length: %s"%(s, len(s)));
# 交集、合集
s2 = set([123, "fdcasc"]);
print(s & s2);
print(s | s2);
# ==== 条件判断 ====
a = 20;
if a < 10:
print("aaaa");
elif 10 <= a < 20:
print("bbb");
else:
print("ccc");
a = " ";
if a and a.strip():
print(a);
else:
print("Null string");
# 三目运算符
a, b, c = 1, 2, 3;
print(a if (b > c) else c);
# ==== 循环 ====
# ---- for 循环 ----
a = list(range(10));
sum = 0;
for x in a:
print(x);
sum += x;
print("sum: %s"%sum);
# ---- for 循环下标 ----
l = ["cdsa", 32, 33.2, None, True];
for index, item in enumerate(l):
print(index, item);
# ---- while 循环 ----
sum = 0;
i = 0;
while i < 10:
sum += i;
i += 1;
print("sum: %s"%sum);
i = 0;
while i < 10:
if i > 5:
break;
i += 1;
print(i);
# ==== 变量、常量 ====
MAX_VALUE = 333;
a = 'aaa';
b = a;
a = 'sss';
print("%s====%s===%s"%(MAX_VALUE, a, b));
a = int(33.3); # int 函数在内建作用域
__name = "HymanHu"; # 全局作用域
def fun():
name = "JiangHu"; # 闭包函数外的函数域
print(name);
def fun2():
name = "Hawkist"; # 局部作用域
print(name);
fun();
import math;
# ==== 位置参数 ====
def count(start, end = 5):
if not isinstance(start, (int, float)):
raise TypeError("Bad type");
count = 0;
while start <= end:
count += start;
start += 1;
return count;
print(count(1, 100));
# ==== 可变参数 ====
def cal(*number):
sum = 0;
for i in number:
sum += i;
return sum;
print(cal());
print(cal(1,2,3,4,5,77));
# ==== 可变关键字参数 ====
def person(name, age, **kwargs):
if "city" in kwargs:
pass;
print("name:%s;age:%s;city:%s"%(name, age, kwargs.get("city")));
kwargs = {"city":"chengdu"};
person("hj",44,**kwargs);
person("hj", 33, gender="M", city="chengdu");
# ==== 命名关键字参数 ====
def person(name, age, *, city):
print("name:%s;age:%s;city:%s"%(name, age, city));
person("hj", 44, city="chengdu");
# ==== 多返回值 ====
def move(x, y, step, angle):
nx = x + step * math.cos(angle);
ny = y + step * math.sin(angle);
return nx, ny;
print(move(12, 22, 4, 30));
# ==== 空函数 ====
def fun():
pass;
print(fun());
# ==== 递归函数(计算阶乘) ====
def fact(number):
if number == 1:
return number;
return number * fact(number - 1);
print(fact(5));
# 尾递归函数
def fact2(number, sum):
if number == 1:
return sum;
return fact2(number - 1, number * sum);
print(fact2(5, 1));
# ==== 内置函数 ====
print(int("22")); # 数据类型转换函数,注意,如果定义变量名和函数名一样,则不会调用该函数,会报错
print(float("22.2"));
print(str(22));
print(abs(-111)); # abs函数,求绝对值
print(max(12, 34, 123.4)); # max函数,求最大值
print(min(-21, -11, 0, 22.3)); # min函数,求最小值
print(" aa bb cc ".strip()); # 字符串去前后空格
print("['6K-8K']".strip('[\'\']')); # 移除字符串头尾指定的字符
print(hex(12)); # hex函数,将十进制数转十六进制
print(math.sqrt(3)); # 求平方根
print(sum(range(1, 101))); # 求和
print(sum(list(range(101))));
print("cdaDcdsa".capitalize()); # 将字符串第一个字符变成大写,其他小写
def product(*numbers):
if len(numbers) == 0:
raise TypeError("Param is error.");
count = 1;
for i in numbers:
count *= i;
return count;
print(product(*[1,2,4,6,7]));
print(product(1,2,4,6,7));
def hannuota(n, a, b, c):
if n == 1:
print(a, "---->", c);
else:
hannuota(n - 1, a, c, b); # 借助C柱,将n-1个圆盘从A柱移动到B柱
print(a, "---->", c); # 将A柱最底层的圆盘移动到C柱
hannuota(n - 1, b, a, c); # 借助A柱,将n-1个圆盘从B柱移动到C柱
hannuota(3, "a", "b", "c");
# 方式一
'''
1、选择 (x² + y² - 1)³ - x²y³ = 0函数,函数等于0是一条心形线,取x,y=0,函数小于0的,说明内部小于0
2、y轴从上到下,step=-1,x轴从左到右计算
3、计算每行字符串:公式<=0,则添加给定的字符串,否则添加空格
4、将每行字符串放到list中
'''
def heart():
content = "I love U";
lines = [];
for y in range(12, -12, -1):
line = "";
for x in range(-30, 30):
# 和原始的公式不一样, 做了x、y轴的缩放
# formula = (x ** 2 + y ** 2 - 1) ** 3 - x ** 2 * y ** 3;
formula = ((x * 0.05) ** 2 + (y * 0.1) ** 2 - 1) ** 3 - (x * 0.05) ** 2 * (y * 0.1) ** 3;
if formula <= 0:
# lineStr += content[x % len(content)]; # 画出比较耿直的心
line += content[(x - y) % len(content)]; # 画出比较斜点的心
else:
line += " ";
lines.append(line);
# print("\n".join(lines));
# 打印颜色
for i in "\n".join(lines):
print("\033[91m" + i, end="", flush=True);
heart();
# 方式二
'''
现在将 print("\n".join(lines)); 反向组装,简化代码
每一行可以是字符串,也可以是一个char list,然后使用"".join的方式连接
for in的另外一种写法,在join括号内定义,外层join是y循环,里层join是x循环
'''
print("\n".join("".join("Love"[(x-y) % len("Love")]for x in range(-30, 30))for y in range(12, -12, -1)));
'''
上面的代码打印出来的是一个长方形,现在要加入判断,引入心形公式
if判断逻辑决定是添加字符串字符,还是添加" ",所以在这个逻辑外面添加括号
这样就将上面的代码转为一句代码完成
'''
print("\n".join("".join(("Love"[(x-y) % len("Love")] if ((x * 0.05) ** 2 + (y * 0.1) ** 2 - 1) ** 3 -
(x * 0.05) ** 2 * (y * 0.1) ** 3 <= 0 else " ")for x in range(-30, 30))for y in range(12, -12, -1)));
# ==== list切片 ====
l = list(range(100));
print(l[:10]);
print(l[10:20]);
print(l[-10:]);
print(l[-20:-10]);
print(l[:10:2]); # 前十个,每两个取一个
print(l[::3]); # 所有数,每三个取一个
print(l[::-1]); # 所有数,倒序
print(l[:]); # 所有数
# ==== tuple切片 ====
t = tuple(range(10));
print(t[:5]);
print(t[2:5]);
print(t[-3:]);
print(t[-5:-3]);
print(t[2:8:2]); # 取2-8位,每两个取一个
print(t[::2]); # 所有数,两个取一个
print(t[::-1]); # 所有数,倒序
print(t[:]); # 所有数
# ==== 字符串切片 ====
s = "cdvdjkcdncdsc;adcjdsna";
print(s[:5]);
print(s[2:5]);
print(s[-3:]);
print(s[-5:-2]);
print(s[2:8:3]); # 取2-8位字符,每三个取一个
print(s[::3]); # 所有字符,每三个取一个
print(s[::-1]); # 所有字符,倒序
print(s[:]); # 所有字符
from collections.abc import Iterable;
# ==== list迭代 ====
l = list(range(10));
print(isinstance(l, Iterable));
for item in l:
print(item);
# enumerate 函数作用于可遍历对象,列出下标
for i, item in enumerate(l):
print(i, item);
# ==== tuple迭代 ====
t = tuple(range(10));
print(isinstance(t, Iterable));
for item in t:
print(item);
for i, item in enumerate(t):
print(i, item);
# ==== dict迭代 ====
d = {"name":"hj", "age":30, "city":"cd"};
print(isinstance(d, Iterable));
for key in d:
print(key, d.get(key));
for value in d.values():
print(value);
for key, value in d.items():
print(key, value);
# ==== 字符串迭代 ====
s = "vdsvfvadas";
print(isinstance(s, Iterable));
for char in s:
print(char);
# ==== 列表生成式 ====
l = list(range(10));
d = {"a":1,"b":2,"c":3};
l_ = [item * item for item in l];
l_ = [item * item for item in l if item % 2 == 0];
l_ = [item.lower() for item in ["Hyamn", 10, None, "Hu"] if isinstance(item, str)];
l_ = [x + y for x in "abc" for y in "qwe"];
l_ = [item.upper() for item in "asd"];
l_ = [item.lower() for item in l_];
# 注意,value值如果是int,需要使用str函数转换,不然会报错误
l_ = [key + "=" + str(value) for key, value in d.items()];
# 导入os模块,遍历当前文件夹下文件
l_ = [f for f in os.listdir(".")];
print(l_);
# ---- 列表转生成器 ----
g = (item for item in range(10));
print(next(g), g.__next__(), next(g));
# ---- 函数转生成器 ----
def fun():
i = 1;
while True:
temp = yield i ** 2;
print(temp);
i += 1;
f = fun();
# 第一次运行只能使用next或者send(None)
# send函数是Generator类的方法,作用相当于使生成器继续运行
# send参数,传递给yield返回值,即temp = "Hello World"
print(next(f), f.__next__(), next(f), f.send("hello world"));
from collections.abc import Iterable, Iterator;
# ==== 迭代对象、迭代器 ====
print(isinstance([], Iterable));
print(isinstance({}, Iterable));
print(isinstance("dsadas", Iterable));
print(isinstance((item for item in range(10)), Iterable));
print(isinstance([], Iterator));
print(isinstance({}, Iterator));
print(isinstance("dsadas", Iterator));
print(isinstance((item for item in range(10)), Iterator)); # 生成器是迭代器,其它的迭代对象不是
print(isinstance(iter([]), Iterator)); # 调用iter()函数将迭代对象转化为迭代器
print(isinstance(iter({}), Iterator));
print(isinstance(iter("sfdsfs"), Iterator));
def fibonacci(max):
# 相当于 t = (0, 0, 1); n = t[0]; a = t[1]; b = t[2];
n, a, b = 0, 0, 1;
sb = "";
while n <= max:
sb += str(b) + ",";
a, b = b, a + b;
n += 1;
return sb[:-1];
print(fibonacci(20));
# 定义一个trim()函数,去掉前后空格
def trim(s):
if len(s) == 0 or s.isspace():
return '';
while s[0] == ' ':
s = s[1:];
while s[-1] == ' ':
s = s[:-1];
return s;
print(trim(""));
print(trim(" dsadad "));
import re;
def fun_2(s):
if s.startswith(' ') or s.endswith(' '):
return re.sub(r"^(\s+)|(\s+)$", "", s);
def find_max_min(l):
if len(l) == 0:
return (None, None);
if len(l) == 1:
return (l[0], l[0]);
max = l[0];
min = l[0];
for item in l:
if item > max:
max = item;
if item < min:
min = item;
return (min, max);
print(find_max_min([]));
print(find_max_min([1]));
print(find_max_min(range(10)));
# ---- 杨辉三角形,函数生成器 ----
# 把每一行看做一个list,下一行除了头和尾,是上一行每两个数之和
def yanghui():
l = [1];
while True:
yield l;
l = [1] + [l[n] + l[n + 1] for n in range(len(l) - 1)] + [1];
n = 0;
result =[];
for y in yanghui():
# print(y);
result.append(y)
n += 1;
if n == 10:
break;
print(result);
# ---- logging console ----
import logging;
# logging.basicConfig(level=logging.NOTSET);
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s");
logging.debug("苍老师");
logging.info("松岛枫");
logging.warning("小泽玛利亚");
logging.error("波多野结衣");
logging.critical("吉泽明步");
# ---- logging file ----
import logging;
import os.path;
import time;
# 设置变量
fileFormat = "PythonLog_" + time.strftime("%Y%m%d", time.localtime(time.time()));
logFormat = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s");
logPath = "/log";
logLevel = logging.DEBUG;
# 创建logger
logger = logging.getLogger();
logger.setLevel(logLevel);
# 创建一个 stream handler,输出到控制台 --- 无需设置,logger会自动输出到控制台
# logConsoleHandler = logging.StreamHandler();
# logConsoleHandler.setLevel(logLevel);
# logConsoleHandler.setFormatter(logFormat);
# logger.addHandler(logConsoleHandler);
# 创建一个 file handler,输出到文件
logFile = os.path.abspath(logPath) + "/" + fileFormat + ".log";
logFileHandler = logging.FileHandler(logFile, mode = "w");
logFileHandler.setLevel(logLevel);
logFileHandler.setFormatter(logFormat);
logger.addHandler(logFileHandler);
# 调用日志
logger.debug("论完美不如松岛枫……");
logger.info("论身材不如濑亚美莉……");
logger.warning("论颜值不如波多野结衣……");
logger.error("论业绩不如吉泽明步……");
logger.critical("但是苍老师最红,你有意见么……");
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
import os, time;
import logging,logging.config;
from logging.handlers import RotatingFileHandler;
# 设置变量
fileFormat = "PythonLog_" + time.strftime("%Y%m%d", time.localtime(time.time()));
logFormat = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s");
logPath = "/log";
logLevel = logging.DEBUG;
# 获得logger
def getLogger():
logger = logging.getLogger();
logger.setLevel(logLevel);
# 创建一个 stream handler,输出到控制台
logConsoleHandler = logging.StreamHandler();
logConsoleHandler.setLevel(logLevel);
logConsoleHandler.setFormatter(logFormat);
logger.addHandler(logConsoleHandler);
# 创建一个 file handler,输出到文件
logFile = os.path.abspath(logPath) + "/" + fileFormat + ".log";
# logFileHandler = logging.FileHandler(logFile, mode="w");
logFileHandler = RotatingFileHandler(logFile,maxBytes=100*1024*1024,backupCount=10);
logFileHandler.setLevel(logLevel);
logFileHandler.setFormatter(logFormat);
logger.addHandler(logFileHandler);
return logger;
# 创建全局logger对象,命名为LOGGER
LOGGER = getLogger();
from www.globalLog import LOGGER;
LOGGER.debug("Sql: %s, Args: %s" % (sql, args));
def fn():
return 1 / int("s");
def fn1():
return fn() * 2;
def fn2():
print(fn1() * 3);
fn2();
# ---- 自定义错误 ----
class MyException(Exception):
pass;
# ---- try、except、finally ----
try:
print("start try block");
a = 1 / int("a");
# a = 1 / 2;
print("result is %s" % a);
except ValueError as e:
logger.debug(e);
logging.exception(e);
except ZeroDivisionError as e:
logger.debug(e);
else:
logger.debug("no exception");
finally:
print("start finally block");
print("end");
# ---- with语句 ----
try:
f = open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8");
print(f.readlines());
except IOError as e:
print(e);
finally:
if f:
f.close();
# 相当于上面的代码,简化了try----except----finally模块
with open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8") as f:
print(f.readlines());
# ---- 抛出错误 ----
def fn(x):
if not isinstance(x, int):
raise TypeError("Type error");
fn("cdsa");
# ---- 断言调试 ----
def fn(x):
n = int(x);
assert n != 0, "n is Zero";
return 10 / n;
print(fn(0));
# err.py
import pdb
s = '0'
n = int(s)
pdb.set_trace() # 运行到这里会自动暂停
print(10 / n)
# ---- MyDict.py ----
class MyDict(dict):
def __init__(self, **kw):
super().__init__(**kw);
def __getattr__(self, key):
try:
return self[key];
except KeyError:
raise AttributeError("Dict object has no attribute for %s" % key);
def __setattr__(self, key, value):
self[key] = value;
# ---- TestMyDict.py ----
import unittest;
from com.thornBird.base.MyDict import MyDict;
class TestMyDict(unittest.TestCase):
# 在调用每一个测试方法前执行
def setUp(self):
print("Before Test");
# 在调用每一个测试方法后执行
def tearDown(self):
print("After Test");
def testInit(self):
print("Test 1");
d = MyDict(a = 1, b = "aaa");
self.assertEqual(d.a, 1);
self.assertEqual(d.b, "aaa");
self.assertTrue(isinstance(d, dict));
def testDictError(self):
print("Test 2");
d = MyDict();
# 断言错误,d.testKey访问不存在的key时,我们期待抛出AttributeError
with self.assertRaises(AttributeError):
value = d.testKey;
# 加上这两行代码,我们就能将该测试文件当作Python脚本运行
if __name__ == '__main__':
unittest.main();
class MyDict(dict):
'''
>>> d1 = MyDict()
>>> d1['x'] = 100
>>> d1.x
100
'''''
def __init__(self, **kw):
super().__init__(**kw);
def __getattr__(self, key):
try:
return self[key];
except KeyError:
raise AttributeError("Dict object has no attribute for %s" % key);
def __setattr__(self, key, value):
self[key] = value;
# 引入文档测试模块
if __name__ == "__main__":
import doctest;
doctest.testmod();
from functools import reduce
def str2Num(s):
return int(s);
def calc(exp):
ss = exp.split('+');
ns = map(str2Num, ss);
return reduce(lambda acc, x: acc + x, ns);
def main():
r = calc('100 + 200 + 345');
print('100 + 200 + 345 =', r);
r = calc('99 + 88 + 7.6');
print('99 + 88 + 7.6 =', r);
main();
# ---- 传入参数 ----
def functionCall(x, y, f):
return f(x) + f(y);
f = abs;
print(functionCall(-1, -2, f));
# ---- map函数 ----
def f(x):
return x * x;
i = map(f, list(range(10)));
print(list(i));
print(list(map(str, range(10)))); # 将list元素转化为string
# ---- reduce函数 ----
# ---- 将序列[1, 3, 5, 7, 9]变换成整数13579 ----
from functools import reduce;
def f(x, y):
return x * 10 + y;
print(reduce(f, [1, 3, 5, 7, 9]));
# ---- 传入str,依次到map中找对应的int进行拼接 ----
def str2int(s):
def f1(x, y):
return x * 10 + y;
m = {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9};
def f2(x):
return m[x];
return reduce(f1, map(f2, s));
print(str2int("13579"));
# ---- 还可以使用lambda表达式简化函数定义 ----
m = {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9};
def f3(x):
return m[x];
def str2int2(s):
return reduce(lambda x, y : x * 10 + y, map(f3, "13957"));
# ---- filter函数 ----
def notEmpty(x):
return x and x.strip();
print(list(filter(notEmpty, ['A', '', 'B', None, 'C', ' '])));
# ---- sorted函数 ----
print(sorted([36, 5, -12, 9, -21]));
print(sorted([36, 5, -12, 9, -21], key = abs));
print(sorted(['bob', 'about', 'Zoo', 'Credit'])); # 默认根据第一个字符ASCII码
print(sorted(['bob', 'about', 'Zoo', 'Credit'], key = str.lower));
print(sorted(['bob', 'about', 'Zoo', 'Credit'], key = str.lower, reverse=True));
# ---- 整合 ----
l = list(range(10));
# l.sort(key=lambda item : item, reverse=True);
print(l);
print(list(sorted(l, key=lambda item : item, reverse=True)));
print(list(map(lambda item : item * item, l)));
print(list(filter(lambda item : item % 2 == 0, l)));
print(reduce(lambda x, y : x + y, l));
# ---- 返回函数 ----
def f():
l = [];
for i in range(1, 4):
def f2():
return i * i;
l.append(f2);
return l;
# 返回的函数引用了变量i,但它并非立刻执行,等到3个函数都返回时,变量i已经变成了3,因此最终结果为9
l1, l2, l3 = f();
print(l1()); # 打印结果9
print(l2()); # 打印结果9
print(l3()); # 打印结果9
# 再创建一个函数,用该函数的参数绑定循环变量当前的值,无论该循环变量后续如何更改,已绑定到函数参数的值不变
def f():
def f2(j):
def f3():
return j * j;
return f3;
l = [];
for i in range(1, 4):
l.append(f2(i));
return l;
l1, l2, l3 = f();
print(l1()); # 打印结果1
print(l2()); # 打印结果4
print(l3()); # 打印结果9
# ---- 匿名函数 ----
print(list(filter(lambda x : x % 2 == 0, range(1, 20))));
# ---- 装饰器 ----
import time;
def log(fun):
def wrapper(*args, **keywords):
print("call %s()" % fun.__name__); # 调用函数对象name属性
return fun(*args, **keywords);
return wrapper;
def f():
print(time.time());
print(time.localtime(time.time()));
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())));
f();
# ---- 带参数的装饰器 ----
def log(text):
def decorator(fun):
def wrapper(*args, **kw):
print("%s %s()" % (text, fun.__name__));
return fun(*args, **kw);
return wrapper;
return decorator;
"execute") (
def f():
print(time.time());
print(time.localtime(time.time()));
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())));
f();
# ---- 偏函数 ----
import functools;
int2 = functools.partial(int, base=2); # 定义新函数,固定住**kw中base=2参数;
print(int2("101010"));
max2 = functools.partial(max, 10); #定义新函数,固定住*args参数中的10,也就是10是固定参数
print(max2(2, 4, 6, 9)); # 相当于max2(10, 2, 4, 6, 9);结果是10
l = ['adam', 'LISA', 'barT'];
def f(x):
return x.capitalize();
print(list(map(f, l)));
# ---- 采用lambda表达式 ----
print(list(map(lambda x : x.capitalize(), l)));
# 求累积
def f(l):
return reduce(lambda x, y : x * y, l);
print(f([1, 2, 3, 4]));
# 先来一个示例,处理小数部分
print(reduce(lambda x, y : x / 10 + y, map(int, list(range(10))[::-1] + [0])));
# string to float
def str2Float(x):
l1, l2 = x.split(".");
l2 = l2[::-1] + "0";
return reduce(lambda x, y : x * 10 + y, map(int, l1)) + reduce(lambda x, y : x / 10 + y, map(int, l2));
print(str2Float("32142.4321"));
# 定义一个奇数迭代器
def oddIter():
n = 1;
while True:
n += 2;
yield n;
# 定义筛选函数,返回是另一个函数
def validate(n):
return lambda x : x % n > 0;
# 求素数迭代器
def primeNumber():
yield 2;
l = oddIter();
while True:
n = next(l);
yield n;
l = filter(validate(n), l);
l = [];
# 打印100内的素数
for n in primeNumber():
if n < 100:
l.append(n);
else:
break;
print(",".join(map(str, l)));
# 注意else缩进块,是和for同级的,意思是跳出for循环后再来干擦屁股的事
l = [];
for i in range(2, 100):
# for j in range(2, i):
for j in range(2, (int)(math.sqrt(i) + 1)):
if i % j == 0:
break;
else:
l.append(i);
print(",".join(map(str, l)));
print("-".join(["a", "b", "c"])); # 将字符串join插入list,返回字符串a-b-c
print(list("%s" % x for x in range(2, 5))); # 格式化int迭代器,并转化为字符串list:['2', '3', '4']
print(",".join("%s" % x for x in range(2, 5))); #结合上面两个功能,输出2,3,4
print(list(x for x in range(2, 10))); # 输出int list
print([y for y in range(2, 10) if 10 % y == 0]); # 输出循环中最大数的因子数列[2, 5]
# int list 排除所有的因子数列,得到素数数列,格式化为字符串,调用join函数,返回素数字符串
print(",".join("%s" % x for x in range(2, 100) if not [y for y in range(2, x) if x % y == 0]));
def f(n):
l = [];
for item in str(n):
l.insert(0, item);
return "".join(l) == str(n);
print(list(filter(f, range(1,100))));
def fn(n):
return str(n) == reduce(lambda x, y : y + x, str(n));
print(list(filter(fn, range(1, 100))));
# 方式一,外层用整形,内层要改变外层的变量,需要使用nonlocal关键字,不推荐这种方式
def fn():
x = 0;
def fun():
nonlocal x;
x += 1;
return x;
return fun;
f1 = fn();
print(f1(), f1(), f1());
# 方式二,外层使用list,内层可以修改list元素
def fn():
x = [0];
def fun():
x[0] += 1;
return x[0];
return fun;
f1 = fn();
print(f1(), f1(), f1());
def logDecorator(fun):
def wraper(*args, **kw):
print("Call %s in %s" % (fun.__name__, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))));
return fun(*args, **kw);
return wraper;
def test():
print("test");
test();
# ---- 继承object类 ----
class Student(object):
def __init__(self, id, name, age):
self.__id = id;
self.name = name;
self.age = age;
def setId(self, id):
if id > 0:
self.__id = id;
else:
raise ValueError("Bad data");
def getId(self):
return self.__id;
def printStudentInfo(self):
print("%s %s %s" % (self.__id, self.name, self.age));
def getGrade(self):
if self.age > 40:
return "C";
elif self.age > 20:
return "B";
else:
return "A";
hymanHu = Student(1, "HymanHu", 20);
jiangHu = Student(2, "Jianghu", 30);
hymanHu.printStudentInfo();
print(hymanHu.getGrade());
print(hymanHu.name);
# print(hymanHu.__id);
hymanHu.setId(2);
print(hymanHu.getId());
jiangHu.printStudentInfo();
print(jiangHu.getGrade());
# ---- type创建类 ----
def fn(self, name="Hyman"):
print("name is %s" % name);
Hello = type("Hello", (object,), dict(hello = fn));
hello = Hello();
hello.hello();
# ---- 元类创建类 ----
class MyMetaclass(type):
def __new__(typ, classNmae, parentClassList, classAttrs):
classAttrs["add"] = lambda self, value : self.append(value);
return type.__new__(typ, classNmae, parentClassList, classAttrs);
class MyList(list, metaclass=MyMetaclass):
pass;
l = MyList();
l.add(1);
print(l);
# ----枚举类 ----
from enum import Enum, unique;
Month = Enum("Month", ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'));
# 获取Month属性和方法列表
print(dir(Month));
# value属性则是自动赋给成员的int常量,默认从1开始计数
print(Month.Jan.name, Month.Jan.value);
for month, item in Month.__members__.items():
print(month, item.value);
# unique 装饰器检查有没有重复值
class WeekDay(Enum):
Sun = 0;
Mon = 1;
Tue = 2;
Wed = 3;
Thu = 4;
Fri = 5;
Sat = 6;
print(WeekDay.Mon.name, WeekDay.Mon.value, WeekDay(1));
for weekDay, item in WeekDay.__members__.items():
print(weekDay, item.value);
# ---- 类属性&&实例属性 ----
class Student(object):
name = "HymanHu"
hyman = Student();
print(Student.name); # 类属性,结果HymanHu
print(hyman.name); # 实例属性,在没有给实例属性赋值,且属性名和类属性一样的情况下,值相同,HymanHu
hyman.name = "JiangHu";
print(Student.name); # 类属性,结果HymanHu
print(hyman.name); # 实例属性,结果JiangHu
# ---- 限制实例属性的动态绑定 ----
class Human(object):
__slots__ = ("name", "age");
hyman = Human();
hyman.name = "Hyman";
#hyman.sex = "man";
print(hyman.name);
#print(hyman.sex); # AttributeError: 'Human' object has no attribute 'sex'
class Student(object):
def __init__(self, name):
self.name = name;
def __str__(self):
return "Student object: %s" % self.name;
__repr__ = __str__;
hyman = Student("Hyman");
print(hyman);
# ---- 以类的方式实现斐波拉数列 ----
class Fib(object):
def __init__(self, max):
self.a, self.b = 0, 1;
self.max = max;
def __iter__(self):
return self;
def __next__(self):
self.a, self.b = self.b, self.a + self.b;
if self.a > self.max:
raise StopIteration();
return self.a;
# 该方法返回迭代对象元素,item可能是int下标,也可以是切片对象slice,没有对step和负号处理
def __getitem__(self, item):
if isinstance(item, int):
a, b = 1, 1;
for i in range(item):
a, b = b, a + b;
return a;
if isinstance(item, slice):
start = item.start;
end = item.stop;
if start is None:
start = 0;
a, b = 1, 1;
L = [];
for i in range(end):
if i >= start:
L.append(a);
a, b = b, a + b;
return L;
result = "";
fib = Fib(100);
for i in fib:
result += str(i) + ",";
print(result[:-1]);
print(fib[3]);
print(fib[:5]);
# ---- 动态处理类的属性和方法 ----
# ---- buil rest api uri ----
class Chain(object):
def __init__(self, path = ""):
self.path = path;
def __getattr__(self, item):
return Chain("%s/%s" % (self.path, item));
def __str__(self):
return self.path;
__repr__ = __str__;
chain = Chain();
# /user/status/list
print(chain.user.status.list);
# ---- call方法 ----
class Student(object):
def __init__(self, name):
self.name = name;
def __call__(self, *args, **kwargs):
print("My name is %s" % self.name);
student = Student("Hyman");
print(callable(student));
student();
# ---- get、set方法
class Student(object):
# _age = 0;
def getAge(self):
return self._age;
setter .
def setAge(self, age):
if not isinstance(age, int):
raise ValueError("Age must integer.");
if age < 0 or age > 200:
raise ValueError("Age must be in 0 ~ 200.");
self._age = age;
hyman = Student();
# hyman.setAge(44);
hyman.setAge = 44;
print(hyman.getAge);
# ---- 动态传递参数 ----
class Clock(object):
def run(self):
print("Clock running……");
def run(animal):
animal.run();
dog = Dog();
cat = Cat();
clock = Clock();
run(dog);
run(cat);
run(clock);
# ---- 动态绑定属性和方法 ----
from types import MethodType;
class Human(object):
pass;
def setAge(self, age):
self.age = age;
def setSex(self, sex):
self.sex = sex;
Human.setSex = setSex;
# 给类动态绑定方法
jianghu = Human();
jianghu.setSex("man");
print(jianghu.sex);
# 给实例绑定属性和方法,但是对另外的实例不起作用
hyman = Human();
hyman.name = "HymanHu";
hyman.setAge = MethodType(setAge, hyman); # 第一参数函数名,第二个参数是实例对象
hyman.setAge(33);
print(hyman.name);
print(hyman.age);
class Animal(object):
def run(self):
print("Animal running……");
class Dog(Animal):
pass;
class Cat(Animal):
pass;
dog = Dog();
cat = Cat();
print(isinstance(dog, Dog));
print(isinstance(dog, Animal));
dog.run();
cat.run();
# ---- 多重继承 ----
class FlyMixIn(object):
def fly(self):
print("Animal fly……");
class Bird(Animal, FlyMixIn):
pass;
bird.run();
bird.fly();
class Animal(object):
def run(self):
print("Animal running……");
class Dog(Animal):
def run(self):
print("Dog running……");
class Cat(Animal):
def run(self):
print("Cat running……");
dog = Dog();
cat = Cat();
dog.run();
cat.run();
# ---- 类型判断一 ----
print(type(123)); # <class 'int'>
print(type("aaaa")); # <class 'str'>
print(type(None)); # <class 'NoneType'>
print(type(abs)); # <class 'builtin_function_or_method'>
print(type(dog)); # <class '__main__.Dog'>
import types;
print(type("cdsa") == str);
print(type(run) == types.FunctionType);
print(type(abs) == types.BuiltinFunctionType);
print(type(lambda x : x + 1) == types.LambdaType);
print(type((x for x in range(1, 10))) == types.GeneratorType);
# ---- 类型判断二 ----
print(isinstance("dsac", str));
print(isinstance([1, 2], (list, tuple))); # 还能用几个候选项用于判断
print(isinstance(run, types.FunctionType));
print(isinstance(dog, Animal));
# ---- 获取对象属性和方法 ----
print(dir("dsad"));
print(dir(dog));
# ---- 操作对象属性 ----
print(hasattr(hymanHu, "name"));
setattr(hymanHu, "name", "aaaaa");
print(getattr(hymanHu, "name"));
#print(getattr(hymanHu, "name1")); # 获取不存在的属性,抛出AttributeError
# ---- 操作对象方法 ----
print(hasattr(hymanHu, "printStudentInfo"));
func = getattr(hymanHu, "printStudentInfo");
func();
class Student(object):
count = 0;
def __init__(self, name):
Student.count += 1;
self.name = name;
hyman = Student("HymanHu");
jianghu = Student("JiangHu");
print(Student.count);
import datetime;
class Student(object):
def getAge(self):
return self._age;
setter .
def setAge(self, age):
if not isinstance(age, int):
raise ValueError("Age must integer.");
if age < 0 or age > 200:
raise ValueError("Age must be in 0 ~ 200.");
self._age = age;
def birthday(self):
return datetime.datetime.now().year - self._age;
hyman = Student();
# hyman.setAge(44);
hyman.setAge = 44;
print(hyman.getAge);
print(hyman.birthday);
class Gender(Enum):
Male = 0;
Female = 1;
class Student(object):
def __init__(self, name, gender):
self.name = name;
self.gender = gender;
hyman = Student("Hyman", Gender.Male);
print(hyman.name, hyman.gender);
# 创建列类,添加列名和数据类型属性
class Field(object):
def __init__(self, columnName, columnType):
self.columnName = columnName;
self.columnType = columnType;
def __str__(self):
return "<%s : %s>" % (self.__class__.__name__, self.columnName);
__repr__ = __str__;
# 根据不同的数据类型创建不同的列子类
class StringField(Field):
def __init__(self, columnName):
super(StringField, self).__init__(columnName, "varchar(100)");
class IntegerField(Field):
def __init__(self, columnName):
super(IntegerField, self).__init__(columnName, "bigint");
field = Field("userId", int);
print(field);
# 创建Model元类
class ModelMetaclass(type):
'''
思路:Model是dict子类,每个key对应一个列对象
1、将不同的Model子类中classAttrs(属性和方法)的属性过滤出来,装载到mappings里面
2、返回Model对应的表名,在此默认Model子类类名
'''
def __new__(typ, className, parentClassList, classAttrs):
print("Metaclass new model……");
if className == "Model":
return type.__new__(typ, className, parentClassList, classAttrs);
print("Found model: %s" % className);
mappings = dict();
for key, value in classAttrs.items():
if isinstance(value, Field):
print("Found mappings: %s : %s" % (key, value));
mappings[key] = value;
# 删除classAttrs中的列属性
for key in mappings.keys():
classAttrs.pop(key);
classAttrs["__mappings__"] = mappings; # 列和属性的映射关系
classAttrs["__table__"] = className; # 假定表名和类名一致
return type.__new__(typ, className, parentClassList, classAttrs);
# 由ModelMetaclass创建Model
class Model(dict, metaclass=ModelMetaclass):
def __init__(self, **kw):
print("init model……");
super(Model, self).__init__(**kw);
def __getattr__(self, key):
try:
return self[key];
except KeyError:
raise AttributeError("Model object has no attribute %s" % key);
def __setattr__(self, key, value):
self[key] = value;
'''
1、遍历mappings,得到对应的列对象,build sql语句
2、调用getattr函数,将每列的值装载到args这个list中
'''
def save(self):
fields = [];
params = [];
args = [];
for key, value in self.__mappings__.items():
fields.append(value.columnName);
params.append("?");
args.append(getattr(self, key, None));
sql = "insert into %s (%s) values (%s)" % (self.__table__, ",".join(fields), ",".join(params));
print("SQL : %s" % sql);
print("Args : %s" % str(args));
class User(Model):
id = IntegerField("id");
name = StringField("user_name");
email = StringField("email");
password = StringField("password");
user = User(id = 1, name = "HymanHu", email = "HymanHu@163.com", password = "111111");
user.save();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
模块注释文档
'''
# 将项目根目录添加到 sys.path,解决 cmd 下执行该模块找不到包的问题
import sys, os;
current_path = os.path.abspath(os.path.dirname(__file__));
separator = "\\" if os.name == "nt" else "/";
project_name = "python_spider" + separator;
root_path = current_path[:current_path.find(project_name) + len(project_name)]; # 获取项目根目录
sys.path.append(root_path);
import sys
def test():
args = sys.argv
if len(args)==1:
print('Hello, world!')
elif len(args)==2:
print('Hello, %s!' % args[1])
else:
print('Too many arguments!')
if __name__=='__main__':
test()
# ---- 根据模块路径获取module ----
def getModel(module_name):
n = module_name.rfind('.');
if n == (-1):
mod = __import__(module_name, globals(), locals());
else:
name = module_name[n + 1:];
mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name);
for attr in dir(mod):
if attr.startswith('_'):
continue;
fn = getattr(mod, attr);
print(fn);
# 获取fn属性
# if callable(fn):
# method = getattr(fn, '__method__', None);
# path = getattr(fn, '__route__', None);
module = getModel("com.thornBird.base.io.ProduceAndConsumer");
# ---- fork函数 ----
# 只在Unix/Linux系统上执行
import os;
print("Process %s start" % os.getpid());
childPId = os.fork();
if childPId == 0:
print("I am child process(%s) and my parent is %s" % (os.getpid(), os.getppid()));
else:
print("I(%s) just create child process, id is %s" % (os.getpid(), childPId));
# ---- Process ----
from multiprocessing import Process;
import os;
def processCall(name):
print("Run process %s(pid = %s)" % (name, os.getpid()));
if __name__ == "__main__":
print("Parent Process is %s" % (os.getpid()));
# 创建进程实例,传入函数以及参数
childProcess = Process(target=processCall, args=("test",));
print("Child process start");
# 开始进程
childProcess.start();
# 等childProcess执行完
childProcess.join();
print("Child process is end");
# ---- Pool ----
from multiprocessing import Pool;
import os, time, random;
def logTime(processName):
print("Run task(%s), pid is %s" % (processName, os.getpid()));
start = time.time();
time.sleep(random.random() * 3);
end = time.time();
print("This task(%s) run %0.2f seconds" % (processName, (end - start)));
if __name__ == "__main__":
print("Parent process is %s" % os.getpid());
pool = Pool(4);
for i in range(5):
pool.apply_async(logTime, args=("process" + str(i),));
print("Waiting all process done");
# close调用后不能再添加进程了
pool.close();
# join等待多个子进程执行完毕
pool.join();
print("All process done");
# ---- subprocess输出 ----
# 模拟执行命令nslookup www.baidu.com
import subprocess;
print("$ nslookup www.baidu.com");
r = subprocess.call(["nslookup", "www.baidu.com"]);
print("Result:", r);
# ---- subprocess输入 ----
import subprocess;
print("$ nslookup");
subP = subprocess.Popen(['nslookup'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE);
'''
communicate模拟输入
相当于输入以下三行命令
set q=mx
python.org
exit
并将执行的结果放到output、err两个变量中,代表正确执行结果和错误结果
'''
output, err = subP.communicate(b'set q=mx\npython.org\nexit\n');
print(output);
print("Exit code:", subP.returncode);
# ---- 进程通信 ----
from multiprocessing import Process, Queue;
import os, time, random;
def writeQueue(queue):
print("Process(%s) to write" % os.getpid());
for value in ["A", "B", "C"]:
print("Put %s into queue" % value);
queue.put(value);
time.sleep(random.random());
def readQueue(queue):
print("Process(%s) to read" % os.getpid());
while True:
value = queue.get(True);
print("Read value %s from queue" % value);
if __name__ == "__main__":
queue = Queue();
pw = Process(target=writeQueue, args=(queue,));
pr = Process(target=readQueue, args=(queue,));
pw.start();
pr.start();
# 等待pw进程结束
pw.join();
# pr进程里面是死循环,强行结束
pr.terminate();
import time, random, queue;
from multiprocessing.managers import BaseManager;
# 发送任务队列
sendQueue = queue.Queue();
# 接收任务队列
receiveQueue = queue.Queue();
'''
为什么要定义这两个函数?
如果在初始化manager的时候,使用lambda表达式,那么在创先进程的时候会报错
_pickle.PicklingError: Can't pickle <function <lambda> at 0x000002C4573B51F0>:
attribute lookup <lambda> on __main__ failed
Windows没有fork函数,multiprocessing需要模拟fork的效果,是通过pickle实现的,
报错信息可知在pickle时候不能调用lambda表达式,所以我们创建函数替换lambda
'''
def getSendQueue():
global sendQueue;
return sendQueue;
def getReceiveQueue():
global receiveQueue;
return receiveQueue;
# 创建QueueManager继承BaseManager
class QueueManager(BaseManager):
pass;
if __name__ == "__main__":
# 将两个队列QueueManager上注册,参数一是注册名,参数二关联Queue对象
# QueueManager.register("sendQueue", callable = lambda : sendQueue);
# QueueManager.register("receiveQueue", callable = lambda : receiveQueue);
QueueManager.register("sendQueue", callable=getSendQueue);
QueueManager.register("receiveQueue", callable=getReceiveQueue);
# 创建QueueManager对象,绑定8080端口,设置验证码‘abc’
manager = QueueManager(address=("127.0.0.1", 8080), authkey=b'abc');
# 启动manager
manager.start();
# 从manager中取出send queue,不能对原生的sendQueue操作,会绕过manager的封装
# 根据注册名来获取
send = manager.sendQueue();
receive = manager.receiveQueue();
# 在send queue中放任务
for i in range(5):
n = random.randint(0, 100);
print("Put %d in send Queue" % n);
send.put(n);
# 从receive queue中拿到queue对象
print("Try to get reveive queue");
for i in range(5):
r = receive.get(timeout = 10);
print("Result is %d" % r);
# 关闭manager
manager.shutdown();
print("Master exit");
import time, sys, queue;
from multiprocessing.managers import BaseManager;
# 创建QueueManager,继承BaseManager
class QueueManager(BaseManager):
pass;
# worker端从网络上获取queue,所以只需注册名称
QueueManager.register("sendQueue");
QueueManager.register("receiveQueue");
# 创建QueueManager对象,连接master manager
serverAddr = "127.0.0.1";
print("Connect server %s" % serverAddr);
manager = QueueManager(address=("127.0.0.1", 8080), authkey=b'abc');
# 从网络连接master
manager.connect();
# 获取queue对象
sendQueue = manager.sendQueue();
receiveQueue = manager.receiveQueue();
# 从sendQueue中取任务,并将处理结果装到receiveQueue中
for i in range(5):
try:
n = sendQueue.get(timeout = 1);
result = n * n;
print("Run task: %d * %d = %d" % (n, n, result));
time.sleep(1);
receiveQueue.put(result);
except queue.Queue.Empty:
print("Serd Queue is empty");
# worker Process 结束
print("Worker exit");
# ---- Thread ----
import threading, time;
def loop():
print("Thread %s is running" % threading.current_thread().name);
for i in range(5):
print("Thread %s print number %d" % (threading.current_thread().name, i));
time.sleep(1);
print("Thread %s is end" % threading.current_thread().name);
print("Thread %s is running" % threading.current_thread().name);
loopThread = threading.Thread(target=loop, name="LoopThread");
loopThread.start();
loopThread.join();
print("Thread %s is end" % threading.current_thread().name);
# ---- 多线程死循环 ----
import threading, multiprocessing;
def loop():
x = 0;
while True:
x = x ^ 1;
for i in range(multiprocessing.cpu_count()):
thread = threading.Thread(target=loop);
thread.start();
# ---- Lock ----
import time, threading;
# 银行存款余额
balance = 0;
# 初始化锁对象
lock = threading.Lock();
def changeBalance(x):
# 函数内部对函数外的变量进行操作,就需要在函数内部声明其为global
global balance;
# 先存后取,balance期望值是0
balance = balance + x;
balance = balance - x;
def bankTread(x):
for i in range(1000000):
# 未加锁的情况
# changeBalance(x);
# 加锁的情况
lock.acquire(); # 获取线程锁
try:
changeBalance(x);
finally:
lock.release(); # 释放锁
thread1 = threading.Thread(target=bankTread, args=(4,), name="BankThread1");
thread2 = threading.Thread(target=bankTread, args=(8,), name="BankThread2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
print("The balance is %s" % balance);
# ---- ThreadLocal ----
import threading;
# 创建全局 ThreadLocal 变量
localThreadStudent = threading.local();
def threadMethod(name):
# 为每个线程修改 ThreadLocal 变量副本
localThreadStudent.name = name;
print("Thread %s student name %s" % (threading.current_thread().name, localThreadStudent.name));
thread1 = threading.Thread(target=threadMethod, args=("HymanHu",), name="ThreadA");
thread2 = threading.Thread(target=threadMethod, args=("JiangHu",), name="ThreadB");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
# ---- 读取文件 ----
# 读取全部,也可调用read(size)按照字符长度读写
f = open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8");
text = f.read();
print(text);
f.close();
# 读取到list中
f = open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8");
lines = f.readlines();
print(lines); # 注意除开最后一行,其余每行都有\n换行符
for line in lines:
print(line.strip("\n"));
f.close();
# 读取文件捕获IO异常
try:
f = open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8");
print(f.readlines());
except IOError as e:
print(e);
finally:
if f:
f.close();
# 使用with语句对try、except、finally模块的缩写
with open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "r", encoding = "utf-8") as f:
print(f.readlines());
# ----写文件 ----
# 追加内容
with open("D:\\projectCode\\hqyj\python_demo\\resource\\test.txt", "a", encoding = "utf-8") as f:
f.write("\n");
f.write("Hello world\n");
f.write("Hello world!");
# ---- StringIO ----
# 将字符串写入内存并获取写入的值
from io import StringIO;
sio = StringIO();
sio.write("Hello World!");
print(sio.getvalue());
# 读取StringIO的值,类似文本文件读取
sio = StringIO("Hello world\nHello world!");
print(sio.readlines());
# ---- BytesIO ----
# 将bytes写入内存
from io import BytesIO;
bio = BytesIO();
bio.write("你好".encode("utf-8"));
print(bio.getvalue());
# 读取BytesIO的值
bio = BytesIO(b'\xe4\xbd\xa0\xe5\xa5\xbd');
print(bio.read());
import os;
# ---- os ----
print(os.name); # 操作系统,nt(Windows)、posix(Linux、Unix、Mac OS X)
# print(os.uname()); # 操作系统详细信息,不支持Windows系统
print(os.environ); # 获取操作系统环境变量
print(os.environ.get("CLASSPATH")); # 获取环境变量中某个值
print(os.path.isfile(".")); # 判断是否为文件
print(os.path.isdir(".")); # 判断是否为文件夹
print(os.path.abspath(".")); # 查看当前目录绝对路径
current_path = os.path.abspath(os.path.dirname(__file__)); # 获取当前文夹路径
separator = "\\" if os.name == "nt" else "/";
project_name = "python_spider" + separator;
root_path = current_path[:current_path.find(project_name) + len(project_name)]; # 获取项目根目录
''''
join ---- 文件目录组装
split ---- 按最后一个分隔符将文件路径拆分成 tuple,最后一个元素是最后级别的目录或文件
why ---- 可以正确处理不同操作系统的分隔符,只是对字符串的操作
'''
print(os.path.join("D:/", "testDir"));
print(os.path.split("D:\\projectCode\\hqyj\\python_demo\\com\\thornBird\\base"));
print(os.path.splitext("D:\\sql\\testdb.sql")) # 拆分文件全路径,可以得到文件后缀
print(os.listdir(".")); # 文件夹下所有的文件和目录
print(os.listdir(os.path.split(os.path.abspath("."))[0])); # 查找当前目录同级文件和目录,包含自己
if not os.path.exists("D:/testDir"):
os.mkdir("D:/testDir"); # 创建文件夹
os.makedirs("d:/var/aaa/bbb"); # 创建多重文件夹
# os.rmdir("D:/testDir"); # 删除文件夹
if not os.path.exists("D:/testDir/test.txt"):
# os.mknod("test.txt"); # Windows不支持mknod
with open("D:/testDir/test.txt", "a", encoding="utf-8") as f:
f.write("\nHello world\n");
os.rename("D:/testDir/test.txt", "D:/testDir/test1.txt"); # 文件重命名
os.remove("D:/testDir/test1.txt"); # 删除文件
# ---- 序列化、反序列化 ----
import pickle;
d = dict(name = "hymanHu", age = 20);
# pickle.dumps && pickle.loads
db = pickle.dumps(d);
print(db);
d2 = pickle.loads(db);
print(d2);
# pickle.dump && pickle.load
f = open("D:\\testDir\\test.txt", "wb");
pickle.dump(d, f);
f.close();
f = open("D:\\testDir\\test.txt", "rb");
d1 = pickle.load(f);
f.close();
print(d1);
print([x for x in os.listdir(".")]); # 打印出当前目录下所有文件和目录
print([x for x in os.listdir(".") if os.path.isfile(x)]); # 打印出当前目录下所有文件
# 打印当前目录py文件
print([x for x in os.listdir(".") if os.path.isfile(x) and os.path.splitext(x)[1] == ".py"]);
def dirList(dir):
dir = os.path.abspath(dir);
for x in os.listdir(dir):
sonDir = os.path.join(dir, x);
if os.path.isdir(sonDir):
dirList(sonDir);
else:
print(sonDir);
dirList(".");
print("-----------------------------");
dirList("D:\\projectCode\\hqyj\\python_demo\\com");
print(os.path.isdir("D:\\projectCode\\hqyj\\python_demo\\com\\thornBird"));
def fileSearch(dir, keyWord):
dir = os.path.abspath(dir);
for item in os.listdir(dir):
sonDir = os.path.join(dir, item);
if os.path.isdir(sonDir):
fileSearch(sonDir, keyWord);
else:
if item.find(keyWord) != -1:
print(sonDir);
fileSearch("D:\\projectCode\\hqyj\\python_demo\\com", "init");
loop = get_event_loop();
while True:
event = loop.get_event();
process_event(event);
def A():
print('1');
print('2');
print('3');
def B():
print('x');
print('y');
print('z');
# 消费者(生成器generator)
def consumer():
r = "";
while True:
temp = yield r;
if not temp:
return;
print("[Consumer] consuming %s..." % temp);
r = "200 OK";
# 生产者
def produce(consumer):
consumer.send(None);
n = 0;
while n < 5:
n += 1;
print("[Produce] producing %s..." % n);
r = consumer.send(n);
print("[Consumer] consumer return %s" % r);
consumer.close();
consumer = consumer();
produce(consumer);
# ---- asyncio ----
import asyncio;
import threading;
# @asyncio.coroutine把一个generator标记为coroutine类型,然后把这个coroutine扔到EventLoop中执行
coroutine .
def hello():
print("Hello World! (%s)" % threading.current_thread());
'''
yield from语法可以让我们方便地调用另一个generator
由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环
当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句
'''
r = yield from asyncio.sleep(1);
print("Hello World again! (%s)" % threading.current_thread());
def helloLoop():
# 获取EventLoop
loop = asyncio.get_event_loop();
# 执行单个coroutine
# loop.run_until_complete(hello());
# 执行多个coroutine
loop.run_until_complete(asyncio.wait([hello(), hello()]));
loop.close();
# helloLoop();
# ---- asyncio\StreamReader\StreamWriter ----
import asyncio;
import urllib.parse;
coroutine .
def wget(url):
print("wget host %s" % url);
# 解析url,根据scheme来判断端口
urlParse = urllib.parse.urlsplit(url);
port = 443 if (urlParse.scheme == "https") else 80;
'''
打开连接,获取StreamReader(从IO数据流中读数据)、StreamWriter对象(从IO数据流中写数据)
不建议直接实例化reader, writer,而是通过open_connection()或start_server()创建
'''
reader, writer = yield from asyncio.open_connection(url, port);
# 将请求信息写入IO
query = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % url;
writer.write(query.encode("utf-8"));
'''
这是一个与底层IO输入缓冲区交互的流量控制方法
当缓冲区达到上限时,drain()阻塞,待到缓冲区回落到下限时,写操作可以被恢复
当不需要等待时,drain()会立即返回
'''
yield from writer.drain();
while True:
line = yield from reader.readline();
if line == b'\r\n':
break;
print("%s header ---- %s" % (url, line.decode("utf-8")));
# Ignore the body, close the socket
writer.close();
def wgetLoop():
loop = asyncio.get_event_loop();
tasks = [wget(url) for url in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']];
loop.run_until_complete(asyncio.wait(tasks));
loop.close();
# wgetLoop();
# ---- async/await ----
import asyncio;
import threading;
async def hello2():
print("Hello World! (%s)" % threading.current_thread());
r = await asyncio.sleep(1);
print("Hello World again! (%s)" % threading.current_thread());
def helloLoop2():
loop = asyncio.get_event_loop();
loop.run_until_complete(asyncio.wait([hello2(), hello2()]));
loop.close();
helloLoop2();
from aiohttp.web import middleware
async def middleware(request, handler):
resp = await handler(request);
resp.text = resp.text + ' wink';
return resp;
# 调用
# app = web.Application(middlewares=[middleware_1, middleware_2]);
# -*- coding: utf-8 -*-
import asyncio;
from aiohttp import web;
# http://127.0.0.1:8000/
async def index(request):
await asyncio.sleep(1);
resp = web.Response(body=b'<h1>Index</h1>');
# 如果不添加content_type,某些严谨的浏览器会把网页当成文件下载,而不是直接显示
resp.content_type = "text/html;charset=utf-8";
return resp;
# http://127.0.0.1:8000/hello/HymanHu
async def hello(request):
await asyncio.sleep(1);
body = '<h1>hello, %s!</h1>' % request.match_info['name'];
resp = web.Response(body=body.encode("utf-8"));
resp.content_type = "text/html;charset=utf-8";
return resp;
# aiohttp的初始化函数init()也是一个coroutine,loop.create_server()则利用asyncio创建TCP服务
async def init(loop):
# 此处使用loop参数,会出现deprecated警告,直接不用或者补全其他参数
# app = web.Application(loop=loop);
app = web.Application();
app.router.add_route("GET", "/", index);
app.router.add_route("GET", "/hello/{name}", hello);
# 老的写法,会产生deprecated警告
# server = await loop.create_server(app.make_handler(), '127.0.0.1', 8000);
# print('Server started at http://127.0.0.1:8000...');
# return server;
# 新的写法
runner = web.AppRunner(app);
await runner.setup();
server = web.TCPSite(runner, '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...');
await server.start();
loop = asyncio.get_event_loop();
loop.run_until_complete(init(loop));
loop.run_forever();
# ---- socket客户端 ----
def clientSocket():
# 创建socket,指定TCP/IP协议
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
# 连接目标,指定ip和端口
s.connect(("www.sina.com.cn", 80));
# 发送请求
s.send(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n');
# 处理返回数据
buffer = [];
while True:
# 每次最多接受1k字节
d = s.recv(1024);
if d:
buffer.append(d);
else:
break;
data = b''.join(buffer);
# 关闭socket
s.close();
# 处理返回数据,打印响应头,网页内容写入文件
headers, body = data.split(b'\r\n\r\n', 1);
print(headers.decode("utf-8"));
with open("D:\\temp\\baidu.html", "wb") as f:
f.write(body);
clientSocket();
import socket, threading;
# ---- TCP服务端 ----
def serverSocket():
# 创建基于TCP/IP协议的socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
# 监听本机9999端口
s.bind(("127.0.0.1", 9999));
s.listen(5); # 监听最大连接数
# 死循环接受客户端连接
while True:
# 接受一个连接,获得socket和address
sock, addr = s.accept();
# 创建新线程来处理TCP连接
t = threading.Thread(target=tcpLink, args=(sock, addr));
t.start();
# 服务端处理连接方法
def tcpLink(socket, address):
print("Accept new connection from %s:%s" % address);
socket.send(b'wellcome');
while True:
data = socket.recv(1024);
if not data or data.decode("utf-8") == "exit":
break;
socket.send(('Hello %s' % data.decode("utf-8")).encode("utf-8"));
socket.close();
print("Connection is closed from %s" % address);
serverSocket();
# 客户端程序,测试serverSocket
def clientSocketTest():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
s.connect("127.0.0.1", 9999);
# 接收欢迎消息
print(s.recv(1024).decode("utf-8"));
for data in [b'HymanHu', b'JiangHu', b'Ye']:
s.send(data);
print(s.recv(1024).decode("utf-8"));
s.send(b'exit');
s.close();
clientSocketTest();
# ---- UDP类型服务端 ----
def serverSocketForUDP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM);
s.bind(("127.0.0.1", 9999));
print("Bind UDP on 9999..");
while True:
# udp接受数据/发送数据的方式
data, addr = s.recvfrom(1024);
print('Received from %s:%s.' % addr);
s.sendto(b'Hello, %s!' % data, addr);
serverSocketForUDP();
# 客户端程序,测试serverSocketForUDP
def clientSocketTestForUDP():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM);
for data in [b'Michael', b'Tracy', b'Sarah']:
# 发送数据:
s.sendto(data, ('127.0.0.1', 9999));
# 接收数据:
print(s.recv(1024).decode('utf-8'));
s.close();
clientSocketTestForUDP();
from email import encoders;
from email.header import Header;
from email.mime.text import MIMEText;
from email.mime.multipart import MIMEMultipart;
from email.mime.base import MIMEBase;
from email.utils import parseaddr,formataddr;
import smtplib;
# format like: Mr Green <green@example.com>
# return =?utf-8?q?Mr_Green?= <green@example.com>
def formatAddr(x):
name, addr = parseaddr(x);
return formataddr((Header(name, "utf-8").encode(), addr));
print(formatAddr("Mr Green <green@example.com>"));
# ---- 文本邮件 ----
def sendErmail():
# 输入发送人信息、收件人地址
# fromAddress = input("From:");
# password = input("Password:");
# toAddress = input("To:");
# smtpServer = input("SMTP Server:");
fromAddress = "898899721@qq.com";
# 在官网设置开启POP3/SMTP等服务,在此输入第三方授权码
password = "*******";
toAddress = "hujiangyx@163.com,hujiang_cd@hqyj.com";
smtpServer = "smtp.qq.com";
'''
参数1:邮件内容;
参数2:subtype,plain标识纯文本 ---- html网页 ---- alternative混合;
参数3:编码集;
:return:
'''
# 发送纯文本邮件
# msg = MIMEText("Hello HymanHu, 万物生长,山河无恙,佳期如许!", "plain", "utf-8");
# 发送html邮件
# msg = MIMEText('<html><body><h1>Hello</h1>' +
# '<p>send by <a href="http://www.python.org">Python</a>...</p>' +
# '</body></html>', 'html', 'utf-8');
# 发送附件邮件,注意subtype类型
msg = MIMEMultipart("alternative");
# 添加纯文本和html,并在html中嵌入图片
msg.attach(MIMEText("Hello HymanHu, 万物生长,山河无恙,佳期如许!", "plain", "utf-8"));
msg.attach(MIMEText('<html><body><h1>Hello</h1>' +
'<p><img src="cid:0"></p>' +
'</body></html>', 'html', 'utf-8'))
# 添加附件
with open("D:\\temp\\test.png", "rb") as f:
# 设置附件的文件名、类型等:
mime = MIMEBase("image", "png", filename="test.png");
# 添加信息头
mime.add_header('Content-Disposition', 'attachment', filename='test.png')
mime.add_header('Content-ID', '<0>')
mime.add_header('X-Attachment-Id', '0')
# 把附件的内容读进来:
mime.set_payload(f.read());
# 用Base64编码:
encoders.encode_base64(mime);
# 添加到MIMEMultipart:
msg.attach(mime);
# 设置from、to、subject等信息
msg["From"] = formatAddr("HymanHu <%s>" % fromAddress);
msg["To"] = formatAddr("JiangHu <%s>" % toAddress);
msg["Subject"] = Header("来自HymanHu的问候", "utf-8").encode();
print(msg);
server = smtplib.SMTP(smtpServer, 25);
# 加密SMTP
# server.starttls();
# 打印出和SMTP服务器交互的所有信息
server.set_debuglevel(1);
server.login(fromAddress, password);
# 发送单人
# server.sendmail(fromAddress, [toAddress], msg.as_string());
# 发送多人
server.sendmail(fromAddress, toAddress.split(","), msg.as_string());
server.quit();
sendErmail();
# -*- coding: utf-8 -*-
from email.parser import Parser;
from email.utils import parseaddr;
from email.header import decode_header;
import poplib;
# 邮件的内容也是str,还需要检测编码,否则,非UTF-8编码的邮件都无法正常显示
def guessCharset(msg):
charset = msg.get_charset();
if charset is None:
contentType = msg.get('Content-Type', '').lower();
pos = contentType.find('charset=');
if pos >= 0:
charset = contentType[pos + 8:].strip();
return charset;
# 邮件的Subject或者Email中包含的名字都是经过编码后的str,要正常显示,就必须decode
def decodeStr(s):
# 返回一个list,有Cc、Bcc多个邮件地址,我们只取第一个
value, charset = decode_header(s)[0];
if charset:
value = value.decode(charset);
return value;
# 解析邮件返回信息,indent用于缩进显示:
def printInfo(msg, indent=0):
if indent == 0:
for header in ['From', 'To', 'Subject']:
value = msg.get(header, '');
if value:
if header == 'Subject':
value = decodeStr(value);
else:
hdr, addr = parseaddr(value);
name = decodeStr(hdr);
value = u'%s <%s>' % (name, addr);
print('%s%s: %s' % (' ' * indent, header, value));
if (msg.is_multipart()):
parts = msg.get_payload();
for n, part in enumerate(parts):
print('%spart %s' % (' ' * indent, n));
print('%s--------------------' % (' ' * indent));
printInfo(part, indent + 1);
else:
content_type = msg.get_content_type();
if content_type=='text/plain' or content_type=='text/html':
content = msg.get_payload(decode=True);
charset = guessCharset(msg);
if charset:
content = content.decode(charset);
print('%sText: %s' % (' ' * indent, content + '...'));
else:
print('%sAttachment: %s' % (' ' * indent, content_type));
def receiveEmail():
# emailAddress = input("Email Address:");
# password = input("Password:");
# popServer = input("POP Server:");
emailAddress = "898899721@qq.com";
password = "ibqfqoxrfmfubecf";
popServer = "pop.qq.com";
# 连接到pop3服务器
server = poplib.POP3(popServer);
# 打开调试信息
server.set_debuglevel(1);
# 打印pop3服务器欢迎信息
print(server.getwelcome().decode("utf-8"));
# 验证登录用户
server.user(emailAddress);
server.pass_(password);
# 打印邮箱邮件数量、占空间大小
print("MailBox stat: MailCount(%s), MailSize(%s)" % server.stat());
# 返回tuple,共三个元素,第二个元素是所有邮件的编号list
# 类似:(b'+OK', [b'1 1787', b'2 1824'...], 327)
resp, mails, octets = server.list();
print(mails);
# 获取最新的一封邮件,索引从1开始
# mailLines存储了邮件每一行
index = len(mails);
resp, mailLines, octets = server.retr(index);
mailContent = b'\r\n'.join(mailLines).decode("utf-8");
# 将邮件内容解析为Message对象,有可能是MIMEMultipart
mail = Parser().parsestr(mailContent);
printInfo(mail);
# 可以根据邮件索引号直接从服务器删除邮件:
# server.dele(index)
# 关闭连接
server.quit();
receiveEmail();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
pymysql 组件示例
'''
import pymysql;
def connect_mysql():
connection = pymysql.connect(host='localhost', user='root', passwd='root', db='test', charset='utf8');
cursor = connection.cursor();
return connection, cursor;
def execute_insert_update_delete(cursor, sql):
status = cursor.execute(sql);
return status;
def execute_query(cursor, sql):
cursor.execute(sql);
return cursor.fetchall();
def execute_commit(cursor):
cursor.connection.commit();
def connection_close(connection, cursor):
connection.close();
cursor.close();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HymanHu";
'''
mysql-connector 组件示例
'''
import mysql.connector;
def mySQLTest():
# 创建数据库连接
conn = mysql.connector.connect(user="root", password="root", host='127.0.0.1',
port='3306', database="test", use_unicode=True);
# 增删改操作
cursor = conn.cursor();
cursor.execute('DROP TABLE IF EXISTS `user`;');
# cursor.execute('create table user (id varchar(20) primary key, name varchar(20))');
cursor.execute('create table user (id INT AUTO_INCREMENT PRIMARY KEY, name varchar(20))');
# 插入一行记录,注意MySQL的占位符是%s:
cursor.execute('insert into user (id, name) values (%s, %s)', ['1', 'HymanHu']);
cursor.execute('insert into user (id, name) values (%s, %s)', ['2', 'Jianghu']);
print(cursor.rowcount);
conn.commit();
cursor.close();
# 查询操作
cursor = conn.cursor();
# cursor.execute('select * from user where id = %s', ('1',));
cursor.execute('select * from user');
values = cursor.fetchall()
print(values);
cursor.close();
# 关闭连接
conn.close();
mySQLTest();
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pymysql
'''
mysql utils
'''
__author__ = "HymanHu";
def get_connect_cursor():
connect = pymysql.connect(host='localhost', user='root', passwd='root', db='maindb', charset='utf8')
return connect, connect.cursor();
def execute_insert_update_delete(cursor, sql):
result = cursor.execute(sql)
return result
def execute_query(cursor, sql):
cursor.execute(sql)
return cursor.fetchall()
def commit_(connect):
connect.commit()
def rollback_(connect):
connect.rollback()
def close_connect_cursor(connect, cursor):
if connect:
connect.close()
if cursor:
cursor.close()
def execute_(sql):
connect, cursor = None, None
result = None
try:
connect, cursor = get_connect_cursor()
if sql.startswith("select"):
result = execute_query(cursor, sql)
else:
result = execute_insert_update_delete(cursor, sql)
commit_(connect)
except Exception as e:
print(e)
finally:
close_connect_cursor(connect, cursor)
return result
def batch_excute_(sqls):
connect, cursor = None, None
try:
connect, cursor = get_connect_cursor()
for sql in sqls:
cursor.execute(sql)
commit_(connect)
except Exception as e:
print(e)
finally:
close_connect_cursor(connect, cursor)
if __name__ == "__main__":
pass
private String lincensePlateRecognition(String filePath, String path) {
String result = "";
BufferedReader in = null;
BufferedReader errorIn = null;
try {
String systemName = System.getProperty("os.name");
filePath = systemName.toLowerCase().startsWith("win") ?
pythonBaseDirForWindow + filePath :
pythonBaseDirForLinux + filePath;
// 第一个元素是执行命令,第二个元素是文件路径,第三个是调用模块参数
String[] args = new String[] {"python", filePath, path};
Process proc = Runtime.getRuntime().exec(args);
// Linux 系统使用 UTF-8 编码
in = new BufferedReader(new InputStreamReader(proc.getInputStream(), "UTF-8"));
errorIn = new BufferedReader(new InputStreamReader(proc.getErrorStream(), "UTF-8"));
String line = null;
while ((line = in.readLine()) != null) {
LOGGER.debug(String.format("======== %s", line));
Matcher matcher = Pattern.compile("result:\\[(.*?)\\]").matcher(line);
if (matcher != null && matcher.find()) {
result = matcher.group(1);
}
}
while ((line = errorIn.readLine()) != null) {
LOGGER.debug(String.format("======== %s", line));
}
proc.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
if (errorIn != null) {
errorIn.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
return result;
}
public List<String> updateBicolorSpheres(int issueCount) {
List<String> result = new ArrayList<>();
try {
String systemName = System.getProperty("os.name");
String filePath = systemName.toLowerCase().startsWith("win") ?
pythonBaseDirForWindow + "/spider/bicolor_sphere_spider.py" :
pythonBaseDirForLinux + "/spider/bicolor_sphere_spider.py";
// 第一个元素是执行命令,第二个元素是文件路径,第三个是调用模块参数
String[] args = new String[] {"python", filePath, String.valueOf(issueCount)};
Process proc = Runtime.getRuntime().exec(args);
BufferedReader in = new BufferedReader(new InputStreamReader(proc.getInputStream(), "gbk"));
String line = null;
while ((line = in.readLine()) != null) {
result.add(line);
}
in.close();
proc.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "HuJiang";
# 将项目根目录添加到 sys.path,解决 cmd 下执行该模块找不到包的问题
import sys, os;
current_path = os.path.abspath(os.path.dirname(__file__));
separator = "\\" if os.name == "nt" else "/";
project_name = "jianghu_python" + separator;
root_path = current_path[:current_path.find(project_name) + len(project_name)]; # 获取项目根目录
sys.path.append(root_path);
if __name__ == "__main__":
# parse_ssq_list_page(page_count=147);
issue_count = 100;
page_size = 100;
# 外部传递参数从第二位开始
if len(sys.argv) > 1:
issue_count = int(sys.argv[1]);
page_size = int(sys.argv[1]);
parse_ssq_page(issue_count, page_size);
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "JiangHu";
# 将其他项目根目录添加到 sys.path
import sys, os;
sys.path.append(r'D:\projectCode\sfac_project\python_wealth_code');
from django.shortcuts import render, redirect;
import json;
from django.http import JsonResponse;
from django.db.models import Q;
from django.core.paginator import Paginator, PageNotAnInteger, EmptyPage;
from util.PageVo import *;
from app_lottery.models import *;
from spider.bicolor_sphere_spider import *;
from data_processing.bicolor_sphere_forecast import *;
# Create your views here.
#127.0.0.1:8080/api/bicolorSpheres/spider ---- get
def spiderBicolorSphere(request):
if request.method == "GET":
parse_ssq_page(10, 10);
return JsonResponse(Result(200, "Update success.").result());
else:
return JsonResponse(Result(500, "Unsurport request method.").result());
#127.0.0.1:8080/api/bicolorSpheres/forecast ---- get
def forecast(request):
if request.method == "GET":
df = init_data();
ses_forecast(df);
holt_forecast(df);
lstm_forecast(df);
random_forecast();
result = save_forecast_result();
return JsonResponse(Result(200, "Forecast success.", result).result());
else:
return JsonResponse(Result(500, "Unsurport request method.").result());