Pyhton脚本备份

Pyhton脚本备份

九月 13, 2023

把图片上传到 sm.ms 图床

sm.ms.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# coding=utf-8

from time import sleep
import requests, os

url = "https://sm.ms/api/v2/upload"
headers = {
'Connection': 'close',
'Authorization': 'key' #此处填写sm.ms图床的token
}


files_path = r'C:\Users\Administrator\Desktop\pic'
ufile = r'C:\Users\Administrator\Desktop\url.txt'
hfile = r'C:\Users\Administrator\Desktop\hash.txt'
dfile = r'C:\Users\Administrator\Desktop\done.txt'

while True:
try:
pfiles = os.listdir(files_path) #遍历文件
for file in pfiles:
fpath = os.path.join(files_path, file)
print('开始上传' + fpath)
pic = open(fpath, 'rb')
files = {
'smfile': pic #读取文件
}
r = requests.post(url, headers=headers, files=files, timeout=60) #上传
print(r.status_code)
pic.close()
r.encoding = 'utf-8'
print(r.json())
t = fpath + "_inurl_" + r.json()['data']['url'] + "\n" #文件名+url
pichash = r.json()['data']['hash']
with open(ufile,"a") as f: #写入文档
f.write(t)
f.close()
print(t)
with open(hfile,"a") as f:
f.write(pichash + "\n")
f.close()
print("图片哈希为" + pichash)
with open(dfile,"a") as f:
f.write(file + "\n")
f.close()
print("图片信息保存完成" + "\n")
sleep(5)
except:
print("程序出错了,开始删除本地已上传的文件" + "\n")
with open(dfile, "r+") as f:
for fname in f:
fpath = os.path.join(files_path, fname)
fpath = fpath[:-1]
os.remove(fpath)
print("已删除" + fpath)
f.truncate(0)
f.close()
print("文件删除完成,900秒后重试上传" + "\n")
sleep(900)

已知图片API的URL的情况下批量下载图片

picdl.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding=utf-8

import requests

filepath = 'E:/acg/acg/' #图片存放的地址
i = 1

#将图片URL存放在 E:\acg\acgurl.txt 中
with open(r'E:\acg\acgurl.txt','r',encoding= 'UTF-8')as f:
for url in f:

r = requests.request('get',url) #get请求
print(r.status_code)

with open(filepath+str('acgpic')+str(i)+'.jpg','wb') as p: #打开写入到path路径里的二进制文件
p.write(r.content) #写入r对象的二进制文件
p.close()
i += 1

图片 MD5 去重

md5quchong.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# coding=utf-8

import os
import hashlib

filedir = 'E:\\acg\\acg'

def filecount(DIR):
filecount = len([name for name in os.listdir(DIR) if os.path.isfile(os.path.join(DIR, name))])
return (filecount)


def md5sum(filename):
f = open(filedir+'/'+filename, 'rb')
md5 = hashlib.md5()
while True:
fb = f.read(8096)
if not fb:
break
md5.update(fb)
f.close()
return (md5.hexdigest())


def delfile():
all_md5 = {}
dir =os.walk(filedir)
for i in dir:
for tlie in i[2]:

if md5sum(tlie) in all_md5.values():
os.remove(filedir+'/'+tlie)
print(tlie)
else:
all_md5[tlie] = md5sum(tlie)


if __name__ == '__main__':
oldf = filecount(filedir)
print('去重前有', oldf, '个文件\n请稍等正在删除重复文件...')
delfile()
print('\n\n去重后剩', filecount(filedir), '个文件')
print('\n\n一共删除了', oldf - filecount(filedir), '个文件\n\n')

将图片按分辨率归类,将尺寸大小相同的图片放在一个新文件夹当中

size4.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# coding=utf-8

import os
import shutil
from PIL import Image

files_path = r'E:\pic\1'
suffixlist = ['.Webp', '.BMP', '.JPEG', '.RAW', '.JPG', '.PNG', '.webp', '.bmp', '.jpeg', '.raw', '.jpg', '.png',]

#定义一个分类函数,函数括号中为需要分类的图片文件夹路径
def photo_classify(files_path):
# 读取文件夹中所有文件的名称
files_list = os.listdir(files_path)

# 循环文件夹中的所有文件
for photo in files_list:

# 获取图片的绝对路径
photo_abspath = os.path.join(files_path, photo)

# 判断photo是不是文件夹,若是文件夹则跳过,若不是文件夹则继续执行
if os.path.isdir(photo_abspath):
print(photo_abspath + '是文件夹')
continue
elif os.path.splitext(photo_abspath)[-1] not in suffixlist:
print(photo_abspath + "的文件后缀不是'.Webp', '.BMP', '.JPEG', '.RAW', '.JPG', '.PNG', '.bmp', '.jpeg', '.raw', '.jpg', '.png',中的一个")
else:
print('处理' + photo_abspath)
# 读图片的尺寸
img = Image.open(photo_abspath)

weight = img.size[0]
high = img.size[1]
ratio = float(weight) / float(high)
print('宽' + str(weight) + '高' + str(high) + '比例' + str(ratio))

if 0.45 <= ratio <= 0.72:
#if weight < high:
picKind = 'pe'
elif 1.50 <= ratio <= 2.30:
picKind = 'pc'
elif weight == high:
picKind = 'pp'
else:
picKind = 'other'

new_path = os.path.join(files_path, picKind)
print(new_path)
# 查询新文件路径下面是否是文件夹
if os.path.isdir(new_path):
# 将图片复制到新文件夹当中
shutil.copyfile(photo_abspath, os.path.join(new_path, photo))
# shutil.copyfile(photo_abspath, new_path)
else:
os.mkdir(new_path)
shutil.copyfile(photo_abspath, os.path.join(new_path, photo))
# shutil.copyfile(photo_abspath, new_path)
print('分类成功!')


if __name__ == '__main__':
photo_classify(files_path)

将图片按分辨率归为四类

size4.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# coding=utf-8

import os
import shutil
from PIL import Image

files_path = r'E:\pic\1'
suffixlist = ['.Webp', '.BMP', '.JPEG', '.RAW', '.JPG', '.PNG', '.webp', '.bmp', '.jpeg', '.raw', '.jpg', '.png',]

#定义一个分类函数,函数括号中为需要分类的图片文件夹路径
def photo_classify(files_path):
# 读取文件夹中所有文件的名称
files_list = os.listdir(files_path)

# 循环文件夹中的所有文件
for photo in files_list:

# 获取图片的绝对路径
photo_abspath = os.path.join(files_path, photo)

# 判断photo是不是文件夹,若是文件夹则跳过,若不是文件夹则继续执行
if os.path.isdir(photo_abspath):
print(photo_abspath + '是文件夹')
continue
elif os.path.splitext(photo_abspath)[-1] not in suffixlist:
print(photo_abspath + "的文件后缀不是'.Webp', '.BMP', '.JPEG', '.RAW', '.JPG', '.PNG', '.bmp', '.jpeg', '.raw', '.jpg', '.png',中的一个")
else:
print('处理' + photo_abspath)
# 读图片的尺寸
img = Image.open(photo_abspath)

weight = img.size[0]
high = img.size[1]
ratio = float(weight) / float(high)
print('宽' + str(weight) + '高' + str(high) + '比例' + str(ratio))

if 0.45 <= ratio <= 0.72:
#if weight < high:
picKind = 'pe'
elif 1.50 <= ratio <= 2.30:
picKind = 'pc'
elif weight == high:
picKind = 'pp'
else:
picKind = 'other'

new_path = os.path.join(files_path, picKind)
print(new_path)
# 查询新文件路径下面是否是文件夹
if os.path.isdir(new_path):
# 将图片复制到新文件夹当中
shutil.copyfile(photo_abspath, os.path.join(new_path, photo))
# shutil.copyfile(photo_abspath, new_path)
else:
os.mkdir(new_path)
shutil.copyfile(photo_abspath, os.path.join(new_path, photo))
# shutil.copyfile(photo_abspath, new_path)
print('分类成功!')


if __name__ == '__main__':
photo_classify(files_path)

将通过 API 获取的 json 信息写入 txt 文件

json_into_txt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# coding=utf-8

import requests, json

tfile = "E:/acg/acgurl.txt"
i = 1
url = 'https://www.loliapi.com/acg?type=json'

while i < 100:
r = requests.request('get',url)
imgjson = r.text
text = json.loads(imgjson)
t = text['imgurl'] + "\n"

with open(tfile,"a") as f:
f.write(t)
f.close()
print(t)
i += 1

将网页里带 img标签 的图片的地址写入 txt 文件(不知道能不能用)

htmlimg2txt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coding=utf-8

import requests, json
from bs4 import BeautifulSoup

tfile = "E:/acg/picurl.txt"

url = ' '

#while True:

r = requests.request('get',url) #获取网页
soup = BeautifulSoup(r.text, "lxml")
for a in soup.find_all('a'):
if a.img:
src = a.img['src']

with open(tfile,"a") as f:
f.write(src)
f.close()
print(src)

文件批量重命名

rename.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# coding=utf-8

import os

files_path = r'E:\acg\acg' #文件位置


num = 1
newname = "img"

files = os.listdir(files_path)
for filename in files:
portion = os.path.splitext(filename)
nname = newname + str(num) + portion[1]
os.chdir(files_path)
os.rename(filename,nname)
print("文件" + filename + "已修改为" + nname)
num = num + 1

文件批量改后缀

resuffix.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# coding=utf-8

import os

files_path = r'E:\acg\acg' #文件位置

originalsuffix = "all" #要修改的后缀名,如果要修改所有文件则填'all'
suffix = "png" #目标后缀名

files = os.listdir(files_path)
#files.sort(key=lambda x:int(x[:-4]))
for filename in files:
portion = os.path.splitext(filename)

if originalsuffix == "all" or portion[1] == originalsuffix:
# 重新组合文件名和后缀名
newname = portion[0] + "." + suffix
os.chdir(files_path)
os.rename(filename,newname)
print("文件" + filename + "已修改为" + newname)

图片格式转换

pic2webp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding=utf-8

import os
from PIL import Image


files_path = r'E:\acg\acg' #文件位置

suffix = "webp" #目标格式(后缀名)
del_original_img = "true" #是否删除原图片
for dpath, dname, dfiles in os.walk(files_path, topdown=False):
for fname in dfiles:
imgpath = os.path.join(dpath, fname)
portion = os.path.splitext(fname)
suffixfname = os.path.join(dpath, portion[0] + "." + suffix)
im = Image.open(imgpath)
if suffix == "jpg" or suffix == "jpeg":
im = im.convert("RGB")
im.save(suffixfname)
print(imgpath + "成功转换为" + suffixfname)
if del_original_img == "true":
if imgpath == suffixfname:
print("目标文件" + imgpath + "已是" + suffix + "格式,为防止误删,跳过此文件")
else:
os.remove(imgpath)
print("已删除源文件" + imgpath)

通过代理访问文章URL(刷阅读量)

articles_read.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# coding=utf-8

import requests
import time
import random

proxy_count = 1
while True:
try:
def get_proxy():
url = "http://route.xiongmaodaili.com/xiongmao-web/api/glip" # 获取代理API地址及参数
params = {
"secret": "8f77e82c****************5015b5ce",
"orderNo": "GL2023***********v4jT0k",
"count": 1,
"isTxt": 0,
"proxyType": 1
}
print("正在请求代理地址")
response = requests.get(url, params=params, timeout=10)

if response.status_code == 200:
data = response.json()
if data["code"] == "0":
print("代理地址请求成功")
return data["obj"][0]["ip"], data["obj"][0]["port"]
else:
print(f"Error: {data['msg']}")
else:
print(f"Error: Unable to fetch proxy. Status code {response.status_code}")

def visit_url(url,proxy):
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
proxies={
'https': proxy,
'http': proxy
}
print(f"尝试通过代理 {proxy} 访问 {url}")
response = requests.get(url, headers=headers, proxies=proxies, timeout=10)
if response.status_code == 200:
print(f"成功访问URL:{url} 通过代理:{proxy}")
else:
print(f"访问URL:{url} 失败,状态码:{response.status_code}")

url_list = [ # 需要访问的URL
"https://blog.csdn.net/weixin_71282685/article/details/134096516",
"https://blog.csdn.net/weixin_71282685/article/details/134039593"
]

while proxy_count <= 700: # 访问次数
proxy_ip, proxy_port = get_proxy()
proxy = f"http://{proxy_ip}:{proxy_port}"
for url in url_list:
visit_url(url,proxy)
# time.sleep(random.randint(5, 12))
rest_time = random.randint(5, 12)
for i in range(rest_time):
print(f"正在休息,还剩{rest_time-i}秒")
time.sleep(1)
print(f"已经完成第 {proxy_count} 次访问")
proxy_count += 1

except Exception as e:
print(f"发生错误:{e},等待10秒再尝试")
time.sleep(10)

向企业微信机器人推送消息

message.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# coding=utf-8

import requests
import json

def send_message_to_webhook(webhook_url, message):
headers = {
'Content-Type': 'application/json'
}

data = {
"msgtype": "markdown",
"markdown": {
"content": message
}
}

response = requests.post(webhook_url, headers=headers, data=json.dumps(data))

if response.status_code != 200:
print(f'请求失败,状态码:{response.status_code},原因:{response.text}')
else:
print('消息发送成功')

webhook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693a91f6-7xxx-4bc4-97a0-0ec2sifa5aaa'
message = 'python消息推送测试'

send_message_to_webhook(webhook_url, message)

Hostloc签到

loc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import os
import time
import random
import re
import textwrap
import requests
import json
from pyaes import AESModeOfOperationCBC
from requests import Session as req_Session


# 随机生成用户空间链接
def randomly_gen_uspace_url() -> list:
url_list = []
# 访问小黑屋用户空间不会获得积分、生成的随机数可能会重复,这里多生成两个链接用作冗余
for i in range(12):
uid = random.randint(10000, 60000)
url = "https://hostloc.com/space-uid-{}.html".format(str(uid))
url_list.append(url)
return url_list


# 使用Python实现防CC验证页面中JS写的的toNumbers函数
def toNumbers(secret: str) -> list:
text = []
for value in textwrap.wrap(secret, 2):
text.append(int(value, 16))
return text


# 不带Cookies访问论坛首页,检查是否开启了防CC机制,将开启状态、AES计算所需的参数全部放在一个字典中返回
def check_anti_cc() -> dict:
result_dict = {}
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
}
home_page = "https://hostloc.com/forum.php"
res = requests.get(home_page, headers=headers)
aes_keys = re.findall('toNumbers\("(.*?)"\)', res.text)
cookie_name = re.findall('cookie="(.*?)="', res.text)

if len(aes_keys) != 0: # 开启了防CC机制
print("检测到防 CC 机制开启!")
printLog("检测到防 CC 机制开启!")
if len(aes_keys) != 3 or len(cookie_name) != 1: # 正则表达式匹配到了参数,但是参数个数不对(不正常的情况)
result_dict["ok"] = 0
else: # 匹配正常时将参数存到result_dict中
result_dict["ok"] = 1
result_dict["cookie_name"] = cookie_name[0]
result_dict["a"] = aes_keys[0]
result_dict["b"] = aes_keys[1]
result_dict["c"] = aes_keys[2]
else:
pass

return result_dict


# 在开启了防CC机制时使用获取到的数据进行AES解密计算生成一条Cookie(未开启防CC机制时返回空Cookies)
def gen_anti_cc_cookies() -> dict:
cookies = {}
anti_cc_status = check_anti_cc()

if anti_cc_status: # 不为空,代表开启了防CC机制
if anti_cc_status["ok"] == 0:
print("防 CC 验证过程所需参数不符合要求,页面可能存在错误!")

printLog("防 CC 验证过程所需参数不符合要求,页面可能存在错误!")
else: # 使用获取到的三个值进行AES Cipher-Block Chaining解密计算以生成特定的Cookie值用于通过防CC验证
print("自动模拟计尝试通过防 CC 验证")
printLog("自动模拟计尝试通过防 CC 验证")
a = bytes(toNumbers(anti_cc_status["a"]))
b = bytes(toNumbers(anti_cc_status["b"]))
c = bytes(toNumbers(anti_cc_status["c"]))
cbc_mode = AESModeOfOperationCBC(a, b)
result = cbc_mode.decrypt(c)

name = anti_cc_status["cookie_name"]
cookies[name] = result.hex()
else:
pass

return cookies


# 登录帐户
def login(username: str, password: str) -> req_Session:
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"origin": "https://hostloc.com",
"referer": "https://hostloc.com/forum.php",
}
login_url = "https://hostloc.com/member.php?mod=logging&action=login&loginsubmit=yes&infloat=yes&lssubmit=yes&inajax=1"
login_data = {
"fastloginfield": "username",
"username": username,
"password": password,
"quickforward": "yes",
"handlekey": "ls",
}

s = req_Session()
s.headers.update(headers)
s.cookies.update(gen_anti_cc_cookies())
res = s.post(url=login_url, data=login_data)
res.raise_for_status()
return s


# 通过抓取用户设置页面的标题检查是否登录成功
def check_login_status(s: req_Session, number_c: int) -> bool:
test_url = "https://hostloc.com/home.php?mod=spacecp"
res = s.get(test_url)
res.raise_for_status()
res.encoding = "utf-8"
test_title = re.findall("<title>(.*?)<\/title>", res.text)

if len(test_title) != 0: # 确保正则匹配到了内容,防止出现数组索引越界的情况
if test_title[0] != "个人资料 - 全球主机交流论坛 - Powered by Discuz!":
print("第", number_c, "个帐户登录失败!")
printLog("第" + str(number_c) + "个帐户登录失败!")
return False
else:
print("第", number_c, "个帐户登录成功!")
printLog("第" + str(number_c) + "个帐户登录成功!")
return True
else:
print("无法在用户设置页面找到标题,该页面存在错误或被防 CC 机制拦截!")
return False


# 抓取并打印输出帐户当前积分
def print_current_points(s: req_Session):
test_url = "https://hostloc.com/forum.php"
res = s.get(test_url)
res.raise_for_status()
res.encoding = "utf-8"
points = re.findall("积分: (\d+)", res.text)

if len(points) != 0: # 确保正则匹配到了内容,防止出现数组索引越界的情况
print("帐户当前积分:" + points[0])
printLog("帐户当前积分:" + points[0])
else:
print("无法获取帐户积分,可能页面存在错误或者未登录!")
printLog("无法获取帐户积分,可能页面存在错误或者未登录!")
time.sleep(5)


# 依次访问随机生成的用户空间链接获取积分
def get_points(s: req_Session, number_c: int):
if check_login_status(s, number_c):
print_current_points(s) # 打印帐户当前积分
url_list = randomly_gen_uspace_url()
# 依次访问用户空间链接获取积分,出现错误时不中断程序继续尝试访问下一个链接
for i in range(len(url_list)):
url = url_list[i]
try:
res = s.get(url)
res.raise_for_status()
print("第", i + 1, "个用户空间链接访问成功")
printLog("第" + str(i + 1) + "个用户空间链接访问成功")
time.sleep(5) # 每访问一个链接后休眠5秒,以避免触发论坛的防CC机制
except Exception as e:
print("链接访问异常:" + str(e))
printLog("链接访问异常:" + str(e))
continue
print_current_points(s) # 再次打印帐户当前积分
else:
print("请检查你的帐户是否正确!")
printLog("请检查你的帐户是否正确!")


# 打印输出当前ip地址
def print_my_ip():
api_url = "https://www.loliapi.com/getip/?type=ip"
headers = {
'py': 'yes'
}
try:
res = requests.get(url=api_url, headers=headers)
res.raise_for_status()
res.encoding = "utf-8"
print("当前使用 ip 地址:" + res.text)
printLog("当前使用 ip 地址:" + res.text)
except Exception as e:
print("获取当前 ip 地址失败:" + str(e))
printLog("获取当前 ip 地址失败:" + str(e))


# 推送信息
def send_message_to_webhook(webhook_url, messageContent):
headers = {
'Content-Type': 'application/json'
}

message = "\n".join(messageContent)
data = {
"msgtype": "markdown",
"markdown": {
"content": message
}
}

response = requests.post(webhook_url, headers=headers, data=json.dumps(data))

if response.status_code != 200:
print(f'请求失败,状态码:{response.status_code},原因:{response.text}')
else:
print('消息发送成功')




# 输出log到messageContent
def printLog(log: str):
messageContent.append(log)


if __name__ == "__main__":
# 账户和密码
username = "username"
password = "password"
# 企微推送URL
webhook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=b******6-a**d-4**e-9**4-614******c73'
messageContent = []
# username = os.environ["HOSTLOC_USERNAME"]
# password = os.environ["HOSTLOC_PASSWORD"]

# 分割用户名和密码为列表
user_list = username.split(",")
passwd_list = password.split(",")

if not username or not password:
printLog("未检测到用户名或密码,请检查环境变量是否设置正确!")
print("未检测到用户名或密码,请检查环境变量是否设置正确!")
elif len(user_list) != len(passwd_list):
printLog("用户名与密码个数不匹配,请检查环境变量设置是否错漏!")
print("用户名与密码个数不匹配,请检查环境变量设置是否错漏!")
else:
print_my_ip()
print("共检测到", len(user_list), "个帐户,开始获取积分")
print("*" * 30)
printLog("共检测到" + str(len(user_list)) + "个帐户,开始获取积分")
printLog("*" * 30)
# 依次登录帐户获取积分,出现错误时不中断程序继续尝试下一个帐户
for i in range(len(user_list)):
try:
s = login(user_list[i], passwd_list[i])
get_points(s, i + 1)
print("*" * 30)

printLog("*" * 30)
except Exception as e:
print("程序执行异常:" + str(e))
print("*" * 30)

printLog("程序执行异常:" + str(e))
printLog("*" * 30)
continue

print("程序执行完毕,获取积分过程结束")
printLog("程序执行完毕,获取积分过程结束")
send_message_to_webhook(webhook_url, messageContent)

测试 域名/IP 能否 TCPing 通

tcping.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import socket
import requests
import json

targets = [
("example.com", [80, 443, 8080]),
("google.com", [80, 443]),
("192.168.1.1", [22, 80, 3389])
]

webhook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693a91f6-7xxx-4bc4-97a0-0ec2sifa5aaa'

def get_ip_info():
ip_api_url = "https://www.loliapi.com/getip/"
ip_api_headers = {"py": "yes"}
try:
print("开始获取IP信息...")
response = requests.get(ip_api_url, headers=ip_api_headers)
if response.status_code == 200:
ip_data = response.json()
print('获取IP信息成功')
return ip_data
except Exception as e:
print(f"获取IP信息失败:{str(e)}")
return {"status": "error", "error_message": str(e)}

def tcping(host, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
print(f"开始测试主机 {host} 的 {port} 端口能否连通...")
sock.connect((host, port))
print(f"主机 {host} 的 {port} 端口可以连通")
sock.close()
return "可以连通"
except (socket.timeout, ConnectionRefusedError):
print(f"主机 {host} 的 {port} 端口不能连通")
return "不能连通"
except socket.error as e:
print(f"出错: {str(e)}")
return f"出错: {str(e)}"

def build_message(ip_info, targets):
print('开始TCPing测试并构建推送消息')
if "error_message" in ip_info:
message = f"当前脚本运行服务器IP信息获取失败:<font color='warning'>{ip_info['error_message']}</font>"
else:
message = f"""当前脚本运行服务器IP地址:<font color="blue">**{ip_info['ip']}**</font>
该IP位于<font color="yellow">{ip_info['continent']}</font>,<font color="yellow">{ip_info['country']}</font>,<font color="orange">**{ip_info['city']}**</font>
TCPing测试结果如下
"""

for target, ports in targets:
for port in ports:
result = tcping(target, port)
if "可以连通" in result:
message += f"> 主机 **{target}** 的 **{port}** 端口<font color='green'>{result}</font> \n"
else:
message += f"> 主机 **{target}** 的 **{port}** 端口<font color='red'>{result}</font> \n"
print('推送消息构建完成')
return message

def send_message_to_webhook(message):
markdown_message = {
"msgtype": "markdown",
"markdown": {
"content": message
}
}

headers = {
'Content-Type': 'application/json'
}

print('开始发送消息')
response = requests.post(webhook_url, headers=headers, data=json.dumps(markdown_message))

if response.status_code != 200:
print(f'请求失败,状态码:{response.status_code},原因:{response.text}')
else:
print('消息发送成功')

if __name__ == "__main__":
ip_info = get_ip_info()
message = build_message(ip_info, targets)
send_message_to_webhook(message)