diff --git a/Schedule/demo4.py b/Schedule/demo4.py
index 9c7d775..097017f 100644
--- a/Schedule/demo4.py
+++ b/Schedule/demo4.py
@@ -10,7 +10,7 @@
def greet(name):
- print('Hello {}'.format(name))
+ print(f'Hello {name}')
# .tag 打标签
diff --git a/Schedule/demo8.py b/Schedule/demo8.py
index c4181e3..6f2f6ac 100644
--- a/Schedule/demo8.py
+++ b/Schedule/demo8.py
@@ -11,11 +11,11 @@
不过你可以通过多线程的形式来运行每个作业以解决此限制:
"""
def job1():
- print("I'm running on threads %s" % threading.current_thread())
+ print(f"I'm running on threads {threading.current_thread()}")
def job2():
- print("I'm running on threads %s" % threading.current_thread())
+ print(f"I'm running on threads {threading.current_thread()}")
def job3():
- print("I'm running on threads %s" % threading.current_thread())
+ print(f"I'm running on threads {threading.current_thread()}")
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
diff --git a/chrome/chrome_helper.py b/chrome/chrome_helper.py
index 4dc6a69..c42b653 100644
--- a/chrome/chrome_helper.py
+++ b/chrome/chrome_helper.py
@@ -10,9 +10,9 @@
# https://medium.com/drunk-wis/python-selenium-chrome-browser-%E8%88%87-driver-%E6%83%B1%E4%BA%BA%E7%9A%84%E7%89%88%E6%9C%AC%E7%AE%A1%E7%90%86-cbaf1d1861ce
CHROME_DRIVER_BASE_URL = "https://chromedriver.storage.googleapis.com"
CHROME_DRIVER_FOLDER = r".\chrome"
-CHROME_DRIVER_MAPPING_FILE = r"{}\mapping.json".format(CHROME_DRIVER_FOLDER)
-CHROME_DRIVER_EXE = r"{}\chromedriver.exe".format(CHROME_DRIVER_FOLDER)
-CHROME_DRIVER_ZIP = r"{}\chromedriver_win32.zip".format(CHROME_DRIVER_FOLDER)
+CHROME_DRIVER_MAPPING_FILE = f"{CHROME_DRIVER_FOLDER}\mapping.json"
+CHROME_DRIVER_EXE = f"{CHROME_DRIVER_FOLDER}\chromedriver.exe"
+CHROME_DRIVER_ZIP = f"{CHROME_DRIVER_FOLDER}\chromedriver_win32.zip"
def init_dir():
@@ -30,8 +30,7 @@ def get_chrome_driver_major_version():
def get_latest_driver_version(browser_ver):
- latest_api = "{}/LATEST_RELEASE_{}".format(
- CHROME_DRIVER_BASE_URL, browser_ver)
+ latest_api = f"{CHROME_DRIVER_BASE_URL}/LATEST_RELEASE_{browser_ver}"
logger.info(f'latest api is {latest_api}')
resp = requests.get(latest_api)
lastest_driver_version = resp.text.strip()
@@ -40,8 +39,7 @@ def get_latest_driver_version(browser_ver):
def download_driver(driver_ver, dest_folder):
- download_api = "{}/{}/chromedriver_win32.zip".format(
- CHROME_DRIVER_BASE_URL, driver_ver)
+ download_api = f"{CHROME_DRIVER_BASE_URL}/{driver_ver}/chromedriver_win32.zip"
logger.info(f'download api is {download_api}')
dest_path = os.path.join(dest_folder, os.path.basename(download_api))
resp = requests.get(download_api, stream=True, timeout=300)
@@ -57,7 +55,7 @@ def download_driver(driver_ver, dest_folder):
def unzip_driver_to_target_path(src_file, dest_path):
with zipfile.ZipFile(src_file, 'r') as zip_ref:
zip_ref.extractall(dest_path)
- logger.info("Unzip [{}] -> [{}]".format(src_file, dest_path))
+ logger.info(f"Unzip [{src_file}] -> [{dest_path}]")
def read_driver_mapping_file():
diff --git a/crawler/_env.py b/crawler/_env.py
index d90ec14..1356e99 100644
--- a/crawler/_env.py
+++ b/crawler/_env.py
@@ -8,5 +8,3 @@
if sys.getdefaultencoding() != 'utf-8':
reload(sys)
sys.setdefaultencoding('utf-8')
-else:
- pass
diff --git a/crawler/proxy/proxy.py b/crawler/proxy/proxy.py
index 42c0794..9b2c7d2 100644
--- a/crawler/proxy/proxy.py
+++ b/crawler/proxy/proxy.py
@@ -181,10 +181,7 @@ def output_file():
if output_type == '':
return
fnum = len(output_filename)
- content = []
- for i in range(fnum):
- content.append([output_head_string])
-
+ content = [[output_head_string] for _ in range(fnum)]
conn.execute("select * from `proxier` order by `active`,`type`,`speed` asc")
rs = conn.fetchall()
@@ -202,14 +199,10 @@ def output_file():
content[4].append(formatline(item)) # 透明代理
elif active == 1 and type == -1:
content[5].append(formatline(item)) # 未知类型的代理
- else:
- pass
-
for i in range(fnum):
content[i].append(output_foot_string)
- f = open(output_filename[i] + "." + output_type, 'w')
- f.write(string.join(content[i], ''))
- f.close()
+ with open(f"{output_filename[i]}.{output_type}", 'w') as f:
+ f.write(string.join(content[i], ''))
# 格式化输出每条记录
@@ -232,13 +225,9 @@ def formatitem(value, colnum):
elif value is None:
value = ''
- if colnum == 5 or colnum == 6 or colnum == 7: # time_xxxed
+ if colnum in [5, 6, 7]: # time_xxxed
value = string.atof(value)
- if value < 1:
- value = ''
- else:
- value = formattime(value)
-
+ value = '' if value < 1 else formattime(value)
if value == '' and output_type == 'htm': value = ' '
return value
@@ -252,8 +241,8 @@ def check_one_proxy(ip, port):
checkstr = target_string
timeout = target_timeout
ip = string.strip(ip)
- proxy = ip + ':' + str(port)
- proxies = {'http': 'http://' + proxy + '/'}
+ proxy = f'{ip}:{str(port)}'
+ proxies = {'http': f'http://{proxy}/'}
opener = urllib.FancyURLopener(proxies)
opener.addheaders = [
('User-agent', 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)')
@@ -261,9 +250,9 @@ def check_one_proxy(ip, port):
t1 = time.time()
if (url.find("?") == -1):
- url = url + '?rnd=' + str(random.random())
+ url = f'{url}?rnd={random.random()}'
else:
- url = url + '&rnd=' + str(random.random())
+ url = f'{url}&rnd={random.random()}'
try:
f = opener.open(url)
@@ -271,13 +260,9 @@ def check_one_proxy(ip, port):
pos = s.find(checkstr)
except:
pos = -1
- pass
t2 = time.time()
timeused = t2 - t1
- if (timeused < timeout and pos > 0):
- active = 1
- else:
- active = 0
+ active = 1 if (timeused < timeout and pos > 0) else 0
update_array.append([ip, port, active, timeused])
print (len(update_array), ' of ', check_in_one_call, " ", ip, ':', port, '--', int(timeused))
@@ -291,9 +276,9 @@ def get_html(url=''):
]
t = time.time()
if (url.find("?") == -1):
- url = url + '?rnd=' + str(random.random())
+ url = f'{url}?rnd={random.random()}'
else:
- url = url + '&rnd=' + str(random.random())
+ url = f'{url}&rnd={random.random()}'
try:
f = opener.open(url)
return f.read()
@@ -310,10 +295,10 @@ def get_html(url=''):
def build_list_urls_1(page=5):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://proxy4free.com/page%(num)01d.html' % {'num': i})
- return ret
+ return [
+ 'http://proxy4free.com/page%(num)01d.html' % {'num': i}
+ for i in range(1, page)
+ ]
def parse_page_1(html=''):
@@ -365,10 +350,7 @@ def parse_page_2(html=''):
port = match[1]
type = match[2]
area = match[3]
- if (type == 'Anonymous'):
- type = 1
- else:
- type = 2
+ type = 1 if (type == 'Anonymous') else 2
ret.append([ip, port, type, area])
if indebug: print ('2', ip, port, type, area)
return ret
@@ -383,10 +365,10 @@ def parse_page_2(html=''):
def build_list_urls_3(page=15):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.samair.ru/proxy/proxy-%(num)02d.htm' % {'num': i})
- return ret
+ return [
+ 'http://www.samair.ru/proxy/proxy-%(num)02d.htm' % {'num': i}
+ for i in range(1, page)
+ ]
def parse_page_3(html=''):
@@ -403,7 +385,7 @@ def parse_page_3(html=''):
<\/tr>''', html, re.VERBOSE)
ret = []
for match in matches:
- ip = match[0] + "." + match[1] + match[2]
+ ip = f"{match[0]}.{match[1]}{match[2]}"
port = match[3]
type = match[4]
area = match[5]
@@ -429,10 +411,10 @@ def parse_page_3(html=''):
def build_list_urls_4(page=3):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.pass-e.com/proxy/index.php?page=%(n)01d' % {'n': i})
- return ret
+ return [
+ 'http://www.pass-e.com/proxy/index.php?page=%(n)01d' % {'n': i}
+ for i in range(1, page)
+ ]
def parse_page_4(html=''):
@@ -473,10 +455,10 @@ def parse_page_4(html=''):
def build_list_urls_5(page=12):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.ipfree.cn/index2.asp?page=%(num)01d' % {'num': i})
- return ret
+ return [
+ 'http://www.ipfree.cn/index2.asp?page=%(num)01d' % {'num': i}
+ for i in range(1, page)
+ ]
def parse_page_5(html=''):
@@ -505,10 +487,10 @@ def parse_page_5(html=''):
def build_list_urls_6(page=3):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.cnproxy.com/proxy%(num)01d.html' % {'num': i})
- return ret
+ return [
+ 'http://www.cnproxy.com/proxy%(num)01d.html' % {'num': i}
+ for i in range(1, page)
+ ]
def parse_page_6(html=''):
@@ -522,10 +504,10 @@ def parse_page_6(html=''):
([^<]+) | #area
''', html, re.VERBOSE)
ret = []
+ type = -1 # 该网站未提供代理服务器类型
for match in matches:
ip = match[0]
port = match[1]
- type = -1 # 该网站未提供代理服务器类型
area = match[2]
if indebug: print ('6', ip, port, type, area)
area = unicode(area, 'cp936')
@@ -592,10 +574,10 @@ def parse_page_8(html=''):
def build_list_urls_9(page=6):
page = page + 1
- ret = []
- for i in range(0, page):
- ret.append('http://proxylist.sakura.ne.jp/index.htm?pages=%(n)01d' % {'n': i})
- return ret
+ return [
+ 'http://proxylist.sakura.ne.jp/index.htm?pages=%(n)01d' % {'n': i}
+ for i in range(0, page)
+ ]
def parse_page_9(html=''):
@@ -613,10 +595,7 @@ def parse_page_9(html=''):
port = match[1]
type = match[3]
area = match[2]
- if (type == 'Anonymous'):
- type = 1
- else:
- type = -1
+ type = 1 if (type == 'Anonymous') else -1
ret.append([ip, port, type, area])
if indebug: print ('9', ip, port, type, area)
return ret
@@ -631,10 +610,10 @@ def parse_page_9(html=''):
def build_list_urls_10(page=5):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.publicproxyservers.com/page%(n)01d.html' % {'n': i})
- return ret
+ return [
+ 'http://www.publicproxyservers.com/page%(n)01d.html' % {'n': i}
+ for i in range(1, page)
+ ]
def parse_page_10(html=''):
@@ -675,13 +654,17 @@ def parse_page_10(html=''):
def build_list_urls_11(page=10):
page = page + 1
- ret = []
- for i in range(1, page):
- ret.append('http://www.my-proxy.com/list/proxy.php?list=%(n)01d' % {'n': i})
-
- ret.append('http://www.my-proxy.com/list/proxy.php?list=s1')
- ret.append('http://www.my-proxy.com/list/proxy.php?list=s2')
- ret.append('http://www.my-proxy.com/list/proxy.php?list=s3')
+ ret = [
+ 'http://www.my-proxy.com/list/proxy.php?list=%(n)01d' % {'n': i}
+ for i in range(1, page)
+ ]
+ ret.extend(
+ (
+ 'http://www.my-proxy.com/list/proxy.php?list=s1',
+ 'http://www.my-proxy.com/list/proxy.php?list=s2',
+ 'http://www.my-proxy.com/list/proxy.php?list=s3',
+ )
+ )
return ret
@@ -715,12 +698,12 @@ def parse_page_11(html=''):
def build_list_urls_12(page=4):
- ret = []
- ret.append('http://www.cybersyndrome.net/plr4.html')
- ret.append('http://www.cybersyndrome.net/pla4.html')
- ret.append('http://www.cybersyndrome.net/pld4.html')
- ret.append('http://www.cybersyndrome.net/pls4.html')
- return ret
+ return [
+ 'http://www.cybersyndrome.net/plr4.html',
+ 'http://www.cybersyndrome.net/pla4.html',
+ 'http://www.cybersyndrome.net/pld4.html',
+ 'http://www.cybersyndrome.net/pls4.html',
+ ]
def parse_page_12(html=''):
@@ -822,16 +805,16 @@ def check_proxy(index, checklist=[]):
def patch_check_proxy(threadCount, action=''):
global check_in_one_call, skip_check_in_hour, conn
threads = []
- if (action == 'checknew'): # 检查所有新加入,并且从未被检查过的
+ if action == 'checkfail':
+ orderby = ' `time_checked` asc '
+ strwhere = ' `active`=0 '
+ elif action == 'checknew':
orderby = ' `time_added` desc '
strwhere = ' `active` is null '
- elif (action == 'checkok'): # 再次检查 以前已经验证成功的 代理
+ elif action == 'checkok':
orderby = ' `time_checked` asc '
strwhere = ' `active`=1 '
- elif (action == 'checkfail'): # 再次检查以前验证失败的代理
- orderby = ' `time_checked` asc '
- strwhere = ' `active`=0 '
- else: # 检查所有的
+ else:
orderby = ' `time_checked` asc '
strwhere = ' 1=1 '
sql = """
@@ -850,11 +833,7 @@ def patch_check_proxy(threadCount, action=''):
check_in_one_call = len(rows)
# 计算每个线程将要检查的代理个数
- if len(rows) >= threadCount:
- num_in_one_thread = len(rows) / threadCount
- else:
- num_in_one_thread = 1
-
+ num_in_one_thread = len(rows) / threadCount if len(rows) >= threadCount else 1
threadCount = threadCount + 1
print ("现在开始验证以下代理服务器.....")
for index in range(1, threadCount):
@@ -874,9 +853,9 @@ def patch_check_proxy(threadCount, action=''):
def get_proxy_one_website(index):
global proxy_array
- func = 'build_list_urls_' + str(index)
- parse_func = eval('parse_page_' + str(index))
- urls = eval(func + '()')
+ func = f'build_list_urls_{str(index)}'
+ parse_func = eval(f'parse_page_{str(index)}')
+ urls = eval(f'{func}()')
for url in urls:
html = get_html(url)
print (url)
@@ -905,7 +884,7 @@ def get_all_proxies():
""" % {'t': formattime(last_add), 'n': skip_get_in_hour})
return
- print ("现在开始从以下" + str(web_site_count) + "个网站抓取代理列表....")
+ print(f"现在开始从以下{str(web_site_count)}个网站抓取代理列表....")
threads = []
count = web_site_count + 1
for index in range(1, count):
diff --git a/crawler/src/crawler_utils.py b/crawler/src/crawler_utils.py
index 16e3cb7..773da38 100755
--- a/crawler/src/crawler_utils.py
+++ b/crawler/src/crawler_utils.py
@@ -100,15 +100,14 @@ def parse_curl_str(self, data_as_dict=False):
elif arg.startswith('--data'):
data_str = string
- if data_as_dict:
- data_dict = {}
- pair_list = unquote(data_str).split('&')
- for pair in pair_list:
- k, v = pair.split('=')
- data_dict[k] = v
- return url, headers_dict, data_dict
- else:
+ if not data_as_dict:
return url, headers_dict, data_str
+ data_dict = {}
+ pair_list = unquote(data_str).split('&')
+ for pair in pair_list:
+ k, v = pair.split('=')
+ data_dict[k] = v
+ return url, headers_dict, data_dict
def get_url(self):
return self.parse_curl_str()[0]
@@ -177,13 +176,14 @@ def retry_get_html(*args, **kwargs):
def lazy_property(fn):
- attr_name = '_lazy_' + fn.__name__
+ attr_name = f'_lazy_{fn.__name__}'
@property
def _lazy_property(self):
if not hasattr(self, attr_name):
setattr(self, attr_name, fn(self))
return getattr(self, attr_name)
+
return _lazy_property
@@ -215,7 +215,9 @@ def form_data_to_dict(s):
def change_ip():
"""change_ip use tor as socks proxy, this command can change tor ip"""
- os.system("""(echo authenticate '"%s"'; echo signal newnym; echo quit) | nc localhost 9051"""%CONFIG.CRAWLER.PROXIES_PASSWORD)
+ os.system(
+ f"""(echo authenticate '"{CONFIG.CRAWLER.PROXIES_PASSWORD}"'; echo signal newnym; echo quit) | nc localhost 9051"""
+ )
print(my_ip())
@@ -251,11 +253,14 @@ def get_proxy_dict(ip, port, proxy_type='http' or 'socks5'):
:param port: int port
:param proxy_type: 'http' or 'socks5'
"""
- proxies = {
- 'http': '{proxy_type}://{ip}:{port}'.format(proxy_type=proxy_type, ip=ip, port=port),
- 'https': '{proxy_type}://{ip}:{port}'.format(proxy_type=proxy_type, ip=ip, port=port),
+ return {
+ 'http': '{proxy_type}://{ip}:{port}'.format(
+ proxy_type=proxy_type, ip=ip, port=port
+ ),
+ 'https': '{proxy_type}://{ip}:{port}'.format(
+ proxy_type=proxy_type, ip=ip, port=port
+ ),
}
- return proxies
def random_ip():
diff --git a/crawler/src/mul_spider.py b/crawler/src/mul_spider.py
index 91350cf..34a2b1f 100644
--- a/crawler/src/mul_spider.py
+++ b/crawler/src/mul_spider.py
@@ -26,9 +26,9 @@ def handle_page(self, url, html):
def get_page(self, url):
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
- print('######fetched %s' % url)
+ print(f'######fetched {url}')
except Exception as e:
- print('Exception: %s %s' % (e, url))
+ print(f'Exception: {e} {url}')
raise gen.Return('')
raise gen.Return(response.body)
@@ -42,7 +42,7 @@ def fetch_url():
if current_url in self._fetching:
return
- print('fetching****** %s' % current_url)
+ print(f'fetching****** {current_url}')
self._fetching.add(current_url)
html = yield self.get_page(current_url)
self._fetched.add(current_url)
@@ -75,9 +75,7 @@ def run(self):
def run_spider(beg, end):
- urls = []
- for page in range(beg, end):
- urls.append('http://www.baidu.com?&page=%d' % page)
+ urls = ['http://www.baidu.com?&page=%d' % page for page in range(beg, end)]
s = AsySpider(urls, 10)
s.run()
@@ -89,9 +87,7 @@ def main():
num = 4 # number of cpu cores
per_num, left = divmod(all_num, num)
s = range(0, all_num, per_num)
- res = []
- for i in range(len(s) - 1):
- res.append((s[i], s[i + 1]))
+ res = [(s[i], s[i + 1]) for i in range(len(s) - 1)]
res.append((s[len(s) - 1], all_num))
print(res)
diff --git a/crawler/src/proxy_req.py b/crawler/src/proxy_req.py
index 284ba23..759ed33 100644
--- a/crawler/src/proxy_req.py
+++ b/crawler/src/proxy_req.py
@@ -61,11 +61,14 @@ def get_proxy_dict(ip, port, proxy_type='http' or 'socks5'):
:param port: int port
:param proxy_type: 'http' or 'socks5'
"""
- proxies = {
- 'http': '{proxy_type}://{ip}:{port}'.format(proxy_type=proxy_type, ip=ip, port=port),
- 'https': '{proxy_type}://{ip}:{port}'.format(proxy_type=proxy_type, ip=ip, port=port),
+ return {
+ 'http': '{proxy_type}://{ip}:{port}'.format(
+ proxy_type=proxy_type, ip=ip, port=port
+ ),
+ 'https': '{proxy_type}://{ip}:{port}'.format(
+ proxy_type=proxy_type, ip=ip, port=port
+ ),
}
- return proxies
def requests_proxy(ip, port):
diff --git a/crawler/src/sync_spider.py b/crawler/src/sync_spider.py
index e92e038..1ab4976 100644
--- a/crawler/src/sync_spider.py
+++ b/crawler/src/sync_spider.py
@@ -32,9 +32,7 @@ def run(self):
def main():
st = time.time()
- urls = []
- for page in range(1, 1000):
- urls.append('http://www.jb51.net/article/%s.htm' % page)
+ urls = [f'http://www.jb51.net/article/{page}.htm' for page in range(1, 1000)]
s = MySpider(urls)
s.run()
print(time.time()-st)
diff --git a/crawler/src/test.py b/crawler/src/test.py
index 3841fa7..6f64427 100644
--- a/crawler/src/test.py
+++ b/crawler/src/test.py
@@ -12,19 +12,17 @@
class TestSyncSpider(SyncSpider):
def handle_html(self, url, html):
print(html)
- pass
class TestAsyncSpider(AsyncSpider):
def handle_html(self, url, html):
print(html)
- pass
-urls = []
-for page in range(1, 1000):
- #urls.append('http://www.jb51.net/article/%s.htm' % page)
- urls.append('http://www.imooc.com/data/check_f.php?page=%d'%page)
+urls = [
+ 'http://www.imooc.com/data/check_f.php?page=%d' % page
+ for page in range(1, 1000)
+]
def test_async():
diff --git a/crawler/src/xpath_utils.py b/crawler/src/xpath_utils.py
index f62e87e..4ef8825 100644
--- a/crawler/src/xpath_utils.py
+++ b/crawler/src/xpath_utils.py
@@ -61,19 +61,18 @@ def parser(self):
def get_field(self, field):
"""定义默认xpath解析方法,如果自定义方法,用get_作为开头覆盖默认get_field"""
- func_name = 'get_' + field
+ func_name = f'get_{field}'
xpath_str = self.xpath_dict.get(field)
if hasattr(self, func_name):
return getattr(self, func_name)(xpath_str)
- else:
- self.logger.debug(field, self.url)
- return self.parser.xpath(xpath_str)[0].strip() if xpath_str else ''
+ self.logger.debug(field, self.url)
+ return self.parser.xpath(xpath_str)[0].strip() if xpath_str else ''
def get_result(self):
- xpath_result = {}
- for field, xpath_string in self.xpath_dict.items():
- xpath_result[field] = makes(self.get_field(field)) # to utf8
- return xpath_result
+ return {
+ field: makes(self.get_field(field))
+ for field, xpath_string in self.xpath_dict.items()
+ }
def get_link(self, xpath_str=None):
return self.url
@@ -90,9 +89,7 @@ def generate_category_urls(self):
parser = etree.HTML(html)
category_xpath_str = '//*[@id="nav"]/div/div/ul//a/@href'
category_hrefs = parser.xpath(category_xpath_str)
- category_urls = []
- for href in category_hrefs:
- category_urls.append(urljoin(self.domain, href))
+ category_urls = [urljoin(self.domain, href) for href in category_hrefs]
for url in category_urls:
parser = etree.HTML(retry_get_html(url))
try:
diff --git a/crawler/toutiao/toutiao_crawler.py b/crawler/toutiao/toutiao_crawler.py
index ba94587..4296ca7 100644
--- a/crawler/toutiao/toutiao_crawler.py
+++ b/crawler/toutiao/toutiao_crawler.py
@@ -27,8 +27,7 @@ def gid():
def get_article(html):
- article = extract('', '
', html)
- return article
+ return extract('', '
', html)
def get_logo_url(html):
@@ -61,8 +60,7 @@ def parse_data(self, json_str):
media_name = each.get('media_name')
if not media_name:
continue
- site = {}
- site['name'] = each.get('media_name')
+ site = {'name': each.get('media_name')}
site['id'] = each.get('media_url')
site['gid'] = 1 # gid()
site['url'] = urlparse(each.get('url')).netloc
@@ -71,9 +69,7 @@ def parse_data(self, json_str):
site['logo'] = get_logo_url(html)
res_site.append(site)
- post = {}
- for k in post_to_get_field:
- post[k] = each.get(k)
+ post = {k: each.get(k) for k in post_to_get_field}
post['html'] = get_article(html)
post['source_gid'] = site['gid']
res_post.append(post)
diff --git a/decorator/class_decorator_with_parameters.py b/decorator/class_decorator_with_parameters.py
index 12f4619..1d52d63 100644
--- a/decorator/class_decorator_with_parameters.py
+++ b/decorator/class_decorator_with_parameters.py
@@ -12,9 +12,9 @@ def __call__(self, f):
def wrap(*args):
print('执行wrap()')
print('装饰器参数:', self.arg1, self.arg2)
- print('执行' + f.__name__ + '()')
+ print(f'执行{f.__name__}()')
f(*args)
- print(f.__name__ + '()执行完毕')
+ print(f'{f.__name__}()执行完毕')
return wrap
diff --git a/decorator/memoize.py b/decorator/memoize.py
index 0baa4ff..d735a33 100644
--- a/decorator/memoize.py
+++ b/decorator/memoize.py
@@ -15,9 +15,7 @@ def wrapper(*args):
@memoize
def fibonacci(n):
- if n <= 1:
- return n
- return fibonacci(n - 1) + fibonacci(n - 2)
+ return n if n <= 1 else fibonacci(n - 1) + fibonacci(n - 2)
if __name__ == "__main__":
diff --git a/decorator/visualize_results.py b/decorator/visualize_results.py
index ff36c19..e58d6df 100644
--- a/decorator/visualize_results.py
+++ b/decorator/visualize_results.py
@@ -19,11 +19,13 @@ def wrapper(*args, **kwargs):
@visualize_results
def analyze_and_visualize(data):
- # Your combined analysis and visualization code here
- data_DataFrame = pd.DataFrame(
- {"x1": np.random.rand(10), "x2": np.random.rand(10), "x3": np.random.rand(10)}
+ return pd.DataFrame(
+ {
+ "x1": np.random.rand(10),
+ "x2": np.random.rand(10),
+ "x3": np.random.rand(10),
+ }
)
- return data_DataFrame
if __name__ == "__main__":
diff --git a/downloader/downloader_with_progress_bar.py b/downloader/downloader_with_progress_bar.py
index 3c6d510..f843fb9 100644
--- a/downloader/downloader_with_progress_bar.py
+++ b/downloader/downloader_with_progress_bar.py
@@ -54,7 +54,7 @@ def get_file_size(url: str, raise_error: bool = False) -> int:
response = requests.head(url)
file_size = response.headers.get('Content-Length')
if file_size is None:
- if raise_error is True:
+ if raise_error:
raise ValueError('该文件不支持多线程分段下载!')
return file_size
return int(file_size)
@@ -74,13 +74,13 @@ def download(url: str, file_name: str, retry_times: int = 3, each_size=16*MB) ->
None
'''
- f = open(file_name, 'wb')
- file_size = get_file_size(url)
+ with open(file_name, 'wb') as f:
+ file_size = get_file_size(url)
- @retry(tries=retry_times)
- @multitasking.task
- def start_download(start: int, end: int) -> None:
- '''
+ @retry(tries=retry_times)
+ @multitasking.task
+ def start_download(start: int, end: int) -> None:
+ '''
根据文件起止位置下载文件
Parameters
@@ -88,45 +88,44 @@ def start_download(start: int, end: int) -> None:
start : 开始位置
end : 结束位置
'''
- _headers = headers.copy()
- # 分段下载的核心
- _headers['Range'] = f'bytes={start}-{end}'
- # 发起请求并获取响应(流式)
- response = session.get(url, headers=_headers, stream=True)
- # 每次读取的流式响应大小
- chunk_size = 128
- # 暂存已获取的响应,后续循环写入
- chunks = []
- for chunk in response.iter_content(chunk_size=chunk_size):
- # 暂存获取的响应
- chunks.append(chunk)
- # 更新进度条
- bar.update(chunk_size)
- f.seek(start)
- for chunk in chunks:
- f.write(chunk)
- # 释放已写入的资源
- del chunks
-
- session = requests.Session()
- # 分块文件如果比文件大,就取文件大小为分块大小
- each_size = min(each_size, file_size)
-
- # 分块
- parts = split_file(0, file_size, each_size)
- print(f'分块数:{len(parts)}')
- # 创建进度条
- bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
- for part in parts:
- start, end = part
- start_download(start, end)
- # 等待全部线程结束
- multitasking.wait_for_tasks()
- f.close()
+ _headers = headers.copy()
+ # 分段下载的核心
+ _headers['Range'] = f'bytes={start}-{end}'
+ # 发起请求并获取响应(流式)
+ response = session.get(url, headers=_headers, stream=True)
+ # 每次读取的流式响应大小
+ chunk_size = 128
+ # 暂存已获取的响应,后续循环写入
+ chunks = []
+ for chunk in response.iter_content(chunk_size=chunk_size):
+ # 暂存获取的响应
+ chunks.append(chunk)
+ # 更新进度条
+ bar.update(chunk_size)
+ f.seek(start)
+ for chunk in chunks:
+ f.write(chunk)
+ # 释放已写入的资源
+ del chunks
+
+ session = requests.Session()
+ # 分块文件如果比文件大,就取文件大小为分块大小
+ each_size = min(each_size, file_size)
+
+ # 分块
+ parts = split_file(0, file_size, each_size)
+ print(f'分块数:{len(parts)}')
+ # 创建进度条
+ bar = tqdm(total=file_size, desc=f'下载文件:{file_name}')
+ for part in parts:
+ start, end = part
+ start_download(start, end)
+ # 等待全部线程结束
+ multitasking.wait_for_tasks()
bar.close()
-if "__main__" == __name__:
+if __name__ == "__main__":
# url = 'https://mirrors.tuna.tsinghua.edu.cn/pypi/web/packages/0d/ea/f936c14b6e886221e53354e1992d0c4e0eb9566fcc70201047bb664ce777/tensorflow-2.3.1-cp37-cp37m-macosx_10_9_x86_64.whl#sha256=1f72edee9d2e8861edbb9e082608fd21de7113580b3fdaa4e194b472c2e196d0'
url = 'https://issuecdn.baidupcs.com/issue/netdisk/yunguanjia/BaiduNetdisk_7.2.8.9.exe'
file_name = 'BaiduNetdisk_7.2.8.9.exe'
diff --git a/excel/excel_demo.py b/excel/excel_demo.py
index ea3d6d4..a983b3d 100644
--- a/excel/excel_demo.py
+++ b/excel/excel_demo.py
@@ -35,7 +35,7 @@ def exceL(sheet):
sheet[j].expand('table').api.HorizontalAlignment = xw.constants.HAlign.xlHAlignCenter
# 设置文本水平对齐方式为居中
sheet[j].expand('table').api.VerticalAlignment = xw.constants.VAlign.xlVAlignCenter
- workbook.save(r'data\{}.xlsx'.format(i[0])) # 以名字命名
+ workbook.save(f'data\{i[0]}.xlsx')
workbook.close()
app.quit()
diff --git a/excel/excel_operating.py b/excel/excel_operating.py
index a61de16..3b65e5b 100644
--- a/excel/excel_operating.py
+++ b/excel/excel_operating.py
@@ -19,8 +19,7 @@ def get_row_clo_num(self):
# 获取某个单元格的值
def get_cell_value(self, row, column):
- cell_value = self.ws.cell(row=row, column=column).value
- return cell_value
+ return self.ws.cell(row=row, column=column).value
# 获取某列的所有值
def get_col_value(self, column):
diff --git a/file_operation/delete_files.py b/file_operation/delete_files.py
index 4602e20..961a3fb 100644
--- a/file_operation/delete_files.py
+++ b/file_operation/delete_files.py
@@ -8,7 +8,10 @@
def backup_file(file_path):
"""备份文件"""
backup_dir = os.path.dirname(file_path)
- backup_name = os.path.basename(file_path) + '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
+ backup_name = (
+ f'{os.path.basename(file_path)}_'
+ + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
+ )
backup_path = os.path.join(backup_dir, backup_name)
shutil.copy2(file_path, backup_path)
print(f"备份文件 {file_path} 到 {backup_path}")
@@ -41,7 +44,7 @@ def main():
if backup:
rollback = input("是否回滚文件(y/n):").lower() == 'y'
if rollback:
- backup_list = glob.glob(os.path.join(path, pattern + '_*'))
+ backup_list = glob.glob(os.path.join(path, f'{pattern}_*'))
if not backup_list:
print("没有找到备份文件")
return
diff --git a/json_loads/json_loads.py b/json_loads/json_loads.py
index 91b1089..d675849 100644
--- a/json_loads/json_loads.py
+++ b/json_loads/json_loads.py
@@ -48,11 +48,7 @@ def _byteify(data, ignore_dicts=False):
# python 3 compatible duck-typing
# if this is a unicode string, return its string representation
- if str(type(data)) == "":
- return data.encode('utf-8')
-
- # if it's anything else, return it in its original form
- return data
+ return data.encode('utf-8') if str(type(data)) == "" else data
if __name__ == '__main__':
diff --git a/leancloud/leancloud_api.py b/leancloud/leancloud_api.py
index 551d123..f7ddb12 100644
--- a/leancloud/leancloud_api.py
+++ b/leancloud/leancloud_api.py
@@ -36,8 +36,7 @@ def get_skip_obj_list(self, skip_num=0, limit_num=30):
query.skip(skip_num * limit_num)
query.limit(limit_num)
try:
- res = query.find()
- return res
+ return query.find()
except:
traceback.print_exc()
return []
@@ -46,7 +45,7 @@ def add_img_info(self, obj_id):
query = self._query
obj = query.get(obj_id)
img_url = obj.get('File').url
- img_info_url = img_url + '?imageInfo'
+ img_info_url = f'{img_url}?imageInfo'
r = LeanCloudApi.fetch_data(img_info_url)
if not r:
return
@@ -69,8 +68,7 @@ def get_recent_obj_list(self, num):
query.descending('ID')
query.limit(num)
try:
- obj_list = query.find()
- return obj_list
+ return query.find()
except:
time.sleep(2)
obj_list = query.find()
@@ -213,7 +211,7 @@ def upload_file(self, file_abspath):
@staticmethod
def is_img_file(filename):
suffix = filename.split('.')[-1].lower() # note: remember ingore case
- img_types = set(['jpg', 'png', 'gif', 'jpeg', 'bmp'])
+ img_types = {'jpg', 'png', 'gif', 'jpeg', 'bmp'}
return suffix in img_types
@staticmethod
diff --git a/office_automation/read_data.py b/office_automation/read_data.py
index 9e3419a..7833beb 100644
--- a/office_automation/read_data.py
+++ b/office_automation/read_data.py
@@ -16,22 +16,18 @@
def read_excel(file):
- df_excel = pd.read_excel(file)
- return df_excel
+ return pd.read_excel(file)
def read_json(file):
with open(file, 'r') as json_f:
- df_json = pd.read_json(json_f)
- return df_json
+ return pd.read_json(json_f)
def read_sql(table):
- sql_cmd = 'SELECT * FROM %s' % table
- df_sql = pd.read_sql(sql_cmd, engine)
- return df_sql
+ sql_cmd = f'SELECT * FROM {table}'
+ return pd.read_sql(sql_cmd, engine)
def read_csv(file):
- df_csv = pd.read_csv(file)
- return df_csv
+ return pd.read_csv(file)
diff --git a/os/kill_process.py b/os/kill_process.py
index 43362a6..a9b21ef 100644
--- a/os/kill_process.py
+++ b/os/kill_process.py
@@ -13,5 +13,3 @@ def kill_process_name(process_name):
os.kill(pid, signal.SIGKILL)
-if __name__ == '__main__':
- pass
diff --git a/raw/parse.py b/raw/parse.py
index d6588ca..0844798 100644
--- a/raw/parse.py
+++ b/raw/parse.py
@@ -12,7 +12,7 @@ def solve_china_city():
for l in f:
l = l.strip()
# unicode.endswith
- if l.endswith(tuple(['市', '区', '县'])):
+ if l.endswith(('市', '区', '县')):
print(l[:-1])
else:
print(l)
diff --git a/tar/make_tar_file.py b/tar/make_tar_file.py
index ecdd820..65eda79 100644
--- a/tar/make_tar_file.py
+++ b/tar/make_tar_file.py
@@ -10,10 +10,7 @@
import tarfile
import logging
-if sys.version_info[0] == 2:
- input = raw_input
-else:
- input = input
+input = raw_input if sys.version_info[0] == 2 else input
formatter = '%(asctime)s %(levelname)s %(filename)s[%(lineno)d]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=formatter,
datefmt='%Y:%m:%d %H:%M:%S', encoding='utf-8')
diff --git a/text_html/dos2unix.py b/text_html/dos2unix.py
index 3558514..2abe83e 100644
--- a/text_html/dos2unix.py
+++ b/text_html/dos2unix.py
@@ -34,7 +34,7 @@ def unix2dos(data):
def confirm(file_):
- s = input('%s? ' % file_)
+ s = input(f'{file_}? ')
return s and s[0] == 'y'
@@ -74,23 +74,23 @@ def main():
nobackup = 0
interactive = 0
for k, v in opts:
- if k == '-f':
+ if k == '-b':
+ backup = v
+ elif k == '-c':
+ nobackup = 1
+ elif k == '-d':
+ copystat = shutil.copystat
+ elif k == '-f':
force = 1
+ elif k == '-i':
+ interactive = 1
elif k == '-n':
noaction = 1
verbose = 1
- elif k == '-i':
- interactive = 1
elif k == '-u':
convert = unix2dos
elif k == '-v':
verbose = 1
- elif k == '-b':
- backup = v
- elif k == '-d':
- copystat = shutil.copystat
- elif k == '-c':
- nobackup = 1
asciiregex = re.compile('[ -~\r\n\t\f]+')
for file_ in args:
if not os.path.isfile(file_) or file_[-len(backup):] == backup:
@@ -105,10 +105,9 @@ def main():
print (file_)
if not interactive or confirm(file_):
if not noaction:
- newfile = file_+'.@'
- f = open(newfile, 'w')
- f.write(newdata)
- f.close()
+ newfile = f'{file_}.@'
+ with open(newfile, 'w') as f:
+ f.write(newdata)
copystat(file_, newfile)
if backup:
backfile = file_+backup
diff --git a/text_html/html2text_tool.py b/text_html/html2text_tool.py
index 47efbec..b064334 100644
--- a/text_html/html2text_tool.py
+++ b/text_html/html2text_tool.py
@@ -12,8 +12,7 @@ def html2txt(html=u''):
import html2text # to markdown not plain text
def html2makrdown(html=u''):
- markdown = html2text.html2text(html) # html must unicode
- return markdown
+ return html2text.html2text(html)
import re
diff --git a/tkinter_demo/Menu.py b/tkinter_demo/Menu.py
index 6fe229e..44f2e31 100644
--- a/tkinter_demo/Menu.py
+++ b/tkinter_demo/Menu.py
@@ -21,7 +21,7 @@
def do_job():
global counter
- l.config(text='do ' + str(counter))
+ l.config(text=f'do {str(counter)}')
counter += 1
diff --git a/tkinter_demo/Scale.py b/tkinter_demo/Scale.py
index e2df041..686a8c4 100644
--- a/tkinter_demo/Scale.py
+++ b/tkinter_demo/Scale.py
@@ -18,7 +18,7 @@
# 第6步,定义一个触发函数功能
def print_selection(v):
- l.config(text='you have selected ' + v)
+ l.config(text=f'you have selected {v}')
# 第5步,创建一个尺度滑条,长度200字符,从0开始10结束,以2为刻度,精度为0.01,触发调用print_selection函数
diff --git a/tkinter_demo/boom.py b/tkinter_demo/boom.py
index bb140e8..14a37af 100644
--- a/tkinter_demo/boom.py
+++ b/tkinter_demo/boom.py
@@ -32,7 +32,7 @@ def boom():
if __name__ == '__main__':
threads = []
- for i in range(5):
+ for _ in range(5):
t = threading.Thread(target=boom)
threads.append(t)
t.start()
diff --git a/tkinter_demo/radiobutton.py b/tkinter_demo/radiobutton.py
index 3378908..f49710e 100644
--- a/tkinter_demo/radiobutton.py
+++ b/tkinter_demo/radiobutton.py
@@ -19,7 +19,7 @@
# 第6步,定义选项触发函数功能
def print_selection():
- l.config(text='you have selected ' + var.get())
+ l.config(text=f'you have selected {var.get()}')
# 第5步,创建三个radiobutton选项,其中variable=var, value='A'的意思就是,当我们鼠标选中了其中一个选项,把value的值A放到变量var中,然后赋值给variable
diff --git a/tkinter_demo/user_login.py b/tkinter_demo/user_login.py
index 004396b..4d55221 100644
--- a/tkinter_demo/user_login.py
+++ b/tkinter_demo/user_login.py
@@ -58,15 +58,15 @@ def usr_login():
# 如果用户名和密码与文件中的匹配成功,则会登录成功,并跳出弹窗how are you? 加上你的用户名。
if usr_name in usrs_info:
if usr_pwd == usrs_info[usr_name]:
- tkinter.messagebox.showinfo(title='Welcome', message='How are you? ' + usr_name)
- # 如果用户名匹配成功,而密码输入错误,则会弹出'Error, your password is wrong, try again.'
+ tkinter.messagebox.showinfo(
+ title='Welcome', message=f'How are you? {usr_name}'
+ )
else:
tkinter.messagebox.showerror(message='Error, your password is wrong, try again.')
- else: # 如果发现用户名不存在
- is_sign_up = tkinter.messagebox.askyesno('Welcome! ', 'You have not sign up yet. Sign up now?')
- # 提示需不需要注册新用户
- if is_sign_up:
- usr_sign_up()
+ elif is_sign_up := tkinter.messagebox.askyesno(
+ 'Welcome! ', 'You have not sign up yet. Sign up now?'
+ ):
+ usr_sign_up()
# 第9步,定义用户注册功能
diff --git a/tools/combinations.py b/tools/combinations.py
index 0e039e1..f26283d 100644
--- a/tools/combinations.py
+++ b/tools/combinations.py
@@ -5,10 +5,7 @@
# bundled in an simple command line interface.
def factorial(n):
- if n == 0:
- return 1
- else:
- return n * factorial(n - 1)
+ return 1 if n == 0 else n * factorial(n - 1)
def combinations(n, r):
diff --git a/tools/compute_duration.py b/tools/compute_duration.py
index 97b7187..a10be48 100644
--- a/tools/compute_duration.py
+++ b/tools/compute_duration.py
@@ -38,9 +38,9 @@ def main():
ftime = 0.0
for file in sorted(filelist):
clip = VideoFileClip(file)
- print("{}: {}秒".format(file, clip.duration))
+ print(f"{file}: {clip.duration}秒")
ftime += clip.duration
- print("%d seconds: " % ftime, str(datetime.timedelta(seconds=ftime)))
+ print("%d seconds: " % ftime, datetime.timedelta(seconds=ftime))
if __name__ == "__main__":
diff --git a/tools/concat_video.py b/tools/concat_video.py
index b2ea94a..efdbd6f 100644
--- a/tools/concat_video.py
+++ b/tools/concat_video.py
@@ -25,10 +25,8 @@ def read_input(filepath):
for idx, start_end_time in enumerate(lines[1:]):
pair = start_end_time.split()
start, end = pair[0], pair[1]
- part_name = 'part_' + str(idx) + '.' + suffix
- cmd = "ffmpeg -i {} -ss {} -to {} -c copy {}".format(
- input_mp4, start, end, part_name
- )
+ part_name = f'part_{str(idx)}.{suffix}'
+ cmd = f"ffmpeg -i {input_mp4} -ss {start} -to {end} -c copy {part_name}"
print(cmd)
os.system(cmd)
yield part_name
@@ -44,12 +42,12 @@ def write_part_to_file(part_list):
with open(CONCAT_FILE, 'w') as f:
for path in filepath_list:
- f.write("file '{}'\n".format(path))
+ f.write(f"file '{path}'\n")
return filepath_list
def concat_video():
- cmd = "ffmpeg -f concat -safe 0 -i {} -c copy output.mp4".format(CONCAT_FILE)
+ cmd = f"ffmpeg -f concat -safe 0 -i {CONCAT_FILE} -c copy output.mp4"
os.system(cmd)
diff --git a/tools/dataframe_draw.py b/tools/dataframe_draw.py
index 7f2c436..a0f4bb3 100755
--- a/tools/dataframe_draw.py
+++ b/tools/dataframe_draw.py
@@ -37,7 +37,5 @@ def draw():
io = StringIO()
fig.savefig(io, format='png')
img_data = base64.encodebytes(bytes(io.getvalue()))
- return u'
'.format(img_data)
+ return f'
'
-if __name__ == '__main__':
- pass
diff --git a/tools/find_file.py b/tools/find_file.py
index 9d50643..a458883 100644
--- a/tools/find_file.py
+++ b/tools/find_file.py
@@ -5,11 +5,7 @@
def find_files(substring, path):
- results = []
- for f in os.listdir(path):
- if substring in f:
- results.append(os.path.join(path, f))
- return results
+ return [os.path.join(path, f) for f in os.listdir(path) if substring in f]
if __name__ == '__main__':
diff --git a/tools/flask_server.py b/tools/flask_server.py
index 993c5a5..954b2ac 100644
--- a/tools/flask_server.py
+++ b/tools/flask_server.py
@@ -17,7 +17,7 @@
def abort_if_todo_doesnt_exist(todo_id):
if todo_id not in TODOS:
- abort(404, message="Todo {} doesn't exist".format(todo_id))
+ abort(404, message=f"Todo {todo_id} doesn't exist")
parser = reqparse.RequestParser()
diff --git a/tools/monitor_and_reload_chrome.py b/tools/monitor_and_reload_chrome.py
index 8feaf9e..14783e9 100755
--- a/tools/monitor_and_reload_chrome.py
+++ b/tools/monitor_and_reload_chrome.py
@@ -51,7 +51,7 @@ def monitor(directory, callback):
chrome_port = 9222
-chrome_json_url = 'http://localhost:%s/json' % (chrome_port)
+chrome_json_url = f'http://localhost:{chrome_port}/json'
refresh_json = json.dumps({
"id": 0,
"method": "Page.reload",
diff --git a/tools/remote_control_computer.py b/tools/remote_control_computer.py
index 3019671..01bed2a 100644
--- a/tools/remote_control_computer.py
+++ b/tools/remote_control_computer.py
@@ -48,9 +48,7 @@ def main():
time_space = 5
yagmail.register(username, password)
while True:
- # 读取未读邮件
- msg = read_mail(username, password)
- if msg:
+ if msg := read_mail(username, password):
if msg == 'shutdown':
shutdown()
elif msg == 'grab':
diff --git a/tools/youdao.py b/tools/youdao.py
index 4378f21..5c2e869 100755
--- a/tools/youdao.py
+++ b/tools/youdao.py
@@ -23,10 +23,9 @@ def fetch(query_str=''):
query = {
'q': query_str
}
- url = 'http://fanyi.youdao.com/openapi.do?keyfrom=11pegasus11&key=273646050&type=data&doctype=json&version=1.1&' + urlencode(query)
+ url = f'http://fanyi.youdao.com/openapi.do?keyfrom=11pegasus11&key=273646050&type=data&doctype=json&version=1.1&{urlencode(query)}'
response = urlopen(url, timeout=3)
- html = response.read().decode('utf-8')
- return html
+ return response.read().decode('utf-8')
def parse(html):
diff --git a/useful_script/random_string_generator.py b/useful_script/random_string_generator.py
index 6ede5f5..0ad78ca 100644
--- a/useful_script/random_string_generator.py
+++ b/useful_script/random_string_generator.py
@@ -5,10 +5,12 @@
def rand_string(length):
""" Generates a random string of numbers, lower- and uppercase chars. """
- return ''.join(random.choice(
- string.ascii_lowercase + string.ascii_uppercase + string.digits)
- for i in range(length)
- )
+ return ''.join(
+ random.choice(
+ string.ascii_lowercase + string.ascii_uppercase + string.digits
+ )
+ for _ in range(length)
+ )
if __name__ == '__main__':
print("Example1:", rand_string(length=4))
diff --git a/web/baidu_google_ping.py b/web/baidu_google_ping.py
index d644c92..c6a85d9 100644
--- a/web/baidu_google_ping.py
+++ b/web/baidu_google_ping.py
@@ -13,7 +13,7 @@
def ping(ping_url, site_name, site_host, post_url, rss_url):
rpc_server = xmlrpclib.ServerProxy(ping_url)
result = rpc_server.weblogUpdates.extendedPing(
- site_name, site_host, "http://"+post_url, "http://"+rss_url
+ site_name, site_host, f"http://{post_url}", f"http://{rss_url}"
)
print(result)
@@ -38,8 +38,7 @@ def main():
for item in client.listen():
print(item)
if item['type'] == 'message':
- msg = item['data']
- if msg:
+ if msg := item['data']:
ping_all( * tuple( json.loads(msg) ) )
diff --git a/web/validator_email_phone.py b/web/validator_email_phone.py
index 9a2c0ed..39c1087 100644
--- a/web/validator_email_phone.py
+++ b/web/validator_email_phone.py
@@ -19,9 +19,7 @@ class Validator(object):
@staticmethod
def validate_email(email):
- if len(email) >= 6 and (Validator.EMAIL_PAT.match(email) is not None):
- return True
- return False
+ return len(email) >= 6 and Validator.EMAIL_PAT.match(email) is not None
@staticmethod
def validate_email_list(email_list):
@@ -29,10 +27,7 @@ def validate_email_list(email_list):
"""
if not isinstance(email_list, list):
email_list = [email_list]
- for email in email_list:
- if not Validator.validate_email(email):
- return False
- return True
+ return all(Validator.validate_email(email) for email in email_list)
@staticmethod
def validate_phone(phone_number):
@@ -53,10 +48,7 @@ def validate_phone(phone_number):
def validate_phone_list(phone_number_list):
if not isinstance(phone_number_list, list):
phone_number_list = [phone_number_list]
- for phone in phone_number_list:
- if not Validator.validate_phone(phone):
- return False
- return True
+ return all(Validator.validate_phone(phone) for phone in phone_number_list)
@staticmethod
def validate(data_list, data_type):
diff --git a/z42/hg_close_branch b/z42/hg_close_branch
index 5734f7f..62d1655 100755
--- a/z42/hg_close_branch
+++ b/z42/hg_close_branch
@@ -58,20 +58,19 @@ class _GetchMacCarbon:
def __call__(self):
from Carbon import Evt
- if Evt.EventAvail(0x0008)[0] == 0: # 0x0008 is the keyDownMask
+ if Evt.EventAvail(0x0008)[0] == 0:
return ''
- else:
- #
- # The event contains the following info:
- # (what,msg,when,where,mod)=Evt.GetNextEvent(0x0008)[1]
- #
- # The message (msg) contains the ASCII char which is
- # extracted with the 0x000000FF charCodeMask; this
- # number is converted to an ASCII character with chr() and
- # returned
- #
- (what, msg, when, where, mod) = Evt.GetNextEvent(0x0008)[1]
- return chr(msg & 0x000000FF)
+ #
+ # The event contains the following info:
+ # (what,msg,when,where,mod)=Evt.GetNextEvent(0x0008)[1]
+ #
+ # The message (msg) contains the ASCII char which is
+ # extracted with the 0x000000FF charCodeMask; this
+ # number is converted to an ASCII character with chr() and
+ # returned
+ #
+ (what, msg, when, where, mod) = Evt.GetNextEvent(0x0008)[1]
+ return chr(msg & 0x000000FF)
getch = _Getch()
@@ -112,13 +111,13 @@ def main():
num = int(num) - 1
if num < 0:
break
- if num >= 0 and num < len(t):
+ if num < len(t):
branch = t[num][0]
branch_list.append(branch)
print('\n关闭分支\n%s\n(Y确认,否则取消)' % '\n'.join(branch_list))
- if not getch() in ['y', 'Y']:
+ if getch() not in ['y', 'Y']:
continue
for branch in branch_list:
@@ -133,7 +132,7 @@ hg update default
import os
success = True
for i in cmd:
- print('>>>%s' % i)
+ print(f'>>>{i}')
if os.system(i) != 0:
success = False
break
diff --git a/z42/hg_update_branch b/z42/hg_update_branch
index 9b355a8..2572b7e 100755
--- a/z42/hg_update_branch
+++ b/z42/hg_update_branch
@@ -58,20 +58,19 @@ class _GetchMacCarbon:
def __call__(self):
from Carbon import Evt
- if Evt.EventAvail(0x0008)[0] == 0: # 0x0008 is the keyDownMask
+ if Evt.EventAvail(0x0008)[0] == 0:
return ''
- else:
- #
- # The event contains the following info:
- # (what,msg,when,where,mod)=Evt.GetNextEvent(0x0008)[1]
- #
- # The message (msg) contains the ASCII char which is
- # extracted with the 0x000000FF charCodeMask; this
- # number is converted to an ASCII character with chr() and
- # returned
- #
- (what, msg, when, where, mod) = Evt.GetNextEvent(0x0008)[1]
- return chr(msg & 0x000000FF)
+ #
+ # The event contains the following info:
+ # (what,msg,when,where,mod)=Evt.GetNextEvent(0x0008)[1]
+ #
+ # The message (msg) contains the ASCII char which is
+ # extracted with the 0x000000FF charCodeMask; this
+ # number is converted to an ASCII character with chr() and
+ # returned
+ #
+ (what, msg, when, where, mod) = Evt.GetNextEvent(0x0008)[1]
+ return chr(msg & 0x000000FF)
getch = _Getch()
@@ -106,7 +105,7 @@ def main():
num = int(num) - 1
if num < 0:
break
- if num >= 0 and num < len(t):
+ if num < len(t):
branch = t[num][0]
cmd = """
hg update %s
@@ -117,7 +116,7 @@ hg update %s
import os
success = True
for i in cmd:
- print(">>>%s" % i)
+ print(f">>>{i}")
if os.system(i) != 0:
success = False
return
diff --git a/z42/print_ac_route b/z42/print_ac_route
index bf1e1c5..5169904 100755
--- a/z42/print_ac_route
+++ b/z42/print_ac_route
@@ -5,7 +5,7 @@ import zapp.ANGELCRUNCH.view._url
from zapp.ANGELCRUNCH.view._route import ROUTE_LIST
from zapp.ANGELCRUNCH.model.const import CID
-CID2NAME = dict((k, v) for v, k in CID.__dict__.iteritems() if v[0] != "_")
+CID2NAME = {k: v for v, k in CID.__dict__.iteritems() if v[0] != "_"}
for route in reversed(ROUTE_LIST):
if isinstance(route.host, (basestring, str)):
diff --git a/z42/print_route b/z42/print_route
index bf2bcec..2e3dc59 100755
--- a/z42/print_route
+++ b/z42/print_route
@@ -11,9 +11,6 @@ from zapp.ANGELCRUNCH.view._route import ROUTE_LIST
for route in reversed(ROUTE_LIST):
if isinstance(route.host, (basestring, str)):
print(route.host.replace("\\.", "."))
- else:
- pass
- # print CID2NAME[route.host.cid]
for i in route.handlers:
path, cls, params = i
print("\t %s %s.%s" % (path.ljust(35), cls.__module__, cls.__name__))
diff --git a/zip/pack_zipfile.py b/zip/pack_zipfile.py
index 6550687..ad2f248 100644
--- a/zip/pack_zipfile.py
+++ b/zip/pack_zipfile.py
@@ -10,10 +10,7 @@
import zipfile
import logging
-if sys.version_info[0] == 2:
- input = raw_input
-else:
- input = input
+input = raw_input if sys.version_info[0] == 2 else input
formatter = '%(asctime)s %(levelname)s %(filename)s[%(lineno)d]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=formatter,
datefmt='%Y:%m:%d %H:%M:%S', encoding='utf-8')