# 操作 ES

# 连接 ES 服务器

es_host_list = ['192.168.0.1', '192.168.0.2', '192.168.0.3', '192.168.0.4']
es_username = ''
es_password = ''
es_client = Elasticsearch(hosts=, http_auth=(es_username, es_password))

# 插入 ES 数据

documents = [{
    "_index": index_name,
    "_source": {
        'metrics': doc,
        '@timestamp': datetime.utcnow()
    }
} for doc in doc_list]
helpers.bulk(es_client, documents)
data = df.to_dict(orient='records')[0]
es_client.create(index='index_name', id='doc_id', body=data, request_timeout=120)

# 删除 ES 数据

es_client.delete(index='index_name', id='doc_id')
es_client.indices.delete(index=index_name, ignore=[400, 404])

# 查询 ES 数据

# for aggregation
resp = client.search(index='xxx', body=query, request_timeout=300)
# for original
resp = helpers.scan(client, query=query, index='xxx', size=10000, request_timeout=300)
for num, doc in enumerate(resp):
    data_list.append(doc.get('_source'))

# 更新 ES 数据

es_client.update(index=es_index, id='es_id', body={"doc": body})

# python 列表切片

pieces = [id_list[i:i + 1000] for i in range(0, len(id_list), 1000)]

# 启动线程

# 单线程

params = {
    'p1': '1',
    'p2': '2',
    'p3': '3'
}
action = {
    "a": method_a,
    "b": method_b,
    "c": method_c,
    "d": method_d,
    "e": method_e,
    "f": method_f,
}
thread = threading.Thread(target=action[name], kwargs=params)
thread.daemon = True
thread.start()

# 线程池

thread_count = 20
op_size = math.ceil(len(user_list) / thread_count)
users_pieces = [user_list[i:i + op_size] for i in range(0, len(user_list), op_size)]
max_workers = len(users_pieces)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
kwargs = {
    'p1': 'p1',
    'p2': 'p2',
    'p3': 'p3',
    'p4': 'p4'
}
futures = []
for users in users_pieces:
    futures.append(executor.submit(loop_users_, **kwargs))
for i in range(len(futures)):
    data_list = futures[i].result()

# 数据库查询

uri = 'oracle://schema:passwd@127.0.0.1:1721/?service_name=aaa.jalen.com'
engine = create_engine(uri, poolclass=NullPool)
sql = "select name, age from tab_t1 where name=:name"
df = pd.read_sql_query(sql, engine, params=['jalen'])
df = pd.read_sql_query(sql, engine, params={'name': 'jalen'})

# 返回不改变 key 顺序的 json 数据

jsonify 返回数据有时因为 flask 的配置问题会导致 key 顺序和理想的不一致

data = collections.OrderedDict()
data['a'] = 'a'
data['b'] = 'b'
return make_response(jsonify(data), 200)

# 匹配包含某个字符串

re.match('^(CHINA|CHINESE)', country_names)

# 执行存储过程

def get_procedure_data(param1, param2, param3):
    logger.info('Run procedure and get data... ...')
    connection = engine.raw_connection()
    try:
        cursor = connection.cursor()
        cursor.callproc("get_somedata", [param1, param2])
        sql = "select * from  some_table where col=:param3"
        df = pd.read_sql_query(sql, engine, params=[param3])
        cursor.close()
        connection.commit()
        return df.to_dict(orient='records')
    except Exception as e:
        logger.error(getattr(e, 'message', repr(e)))
        logger.error(traceback.format_exc())
    finally:
        connection.close()

# Flask 日志配置

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d:%(thread)d %(filename)s:%(lineno)d:%(levelname)s:%(message)s")
logger = logging.getLogger(__name__)

# 数据库查询全字段模糊匹配并分页

def serialize(model, append_columns=[]):
    if len(append_columns) > 0:
        columns = [c.key for c in class_mapper(model[0].__class__).columns]
        data = dict((c, getattr(model[0], c)) for c in columns)
        for index, column in enumerate(append_columns):
            data[column] = model[index+1]
        return data
    else:
        columns = [c.key for c in class_mapper(model.__class__).columns]
        return dict((c, getattr(model, c)) for c in columns)
def list_something():
    data = request.get_json()
    page_num = data.get('page_num')
    page_size = data.get('page_size')
    search_key = data.get('search_key')
    query = db.session.query(SomeModel)
    if search_key:
        search_key_list = search_key.split(',')
        like_text_list = ["%{}%".format(sk) for sk in search_key_list]
        like_phases = []
        name_like = [SomeModel.name.ilike(j) for j in like_text_list]
        ver_like = [SomeModel.version.ilike(j) for j in like_text_list]
        type_like = [SomeModel.type.ilike(j) for j in like_text_list]
        like_phases.extend(name_like)
        like_phases.extend(ver_like)
        like_phases.extend(type_like)
        
        query = query.filter(or_(*like_phases))
    res = query.order_by(desc(SomeModel.createtime), SomeModel.name)
    pagination = res.paginate(
        page=int(page_num),
        per_page=int(page_size),
        error_out=False)
    data_models = pagination.items
    data_list = [serialize(rep) for rep in data_models]
    return make_response(data_list, 200)

# Pandas 批量插入

d_type = {}
for i, j in zip(df.columns, df.dtypes):
    if "object" in str(j):
        d_type.update({i: VARCHAR(64)})
    elif "int" in str(j):
        d_type.update({i: INTEGER()})
    elif "date" in str(j):
        d_type.update({i: DATE()})
df.to_sql('tb_user', engine, if_exists='append', index=False, dtype=d_type)

# InfluxDB 操作

db_name = 'some_db_name'
client = InfluxDBClient(host='192.168.0.1', port=8086, database=db_name)
# delete point
params = {"db": db_name, "q": "delete from tb_some_table where id='%s'" % bid}
client.request('query', 'get', params=params)
# insert point
df["time"] = '2022-12-12 00:00:00'
client.write_points(df.to_dict(orient='records'))

# Flask login

user = User.query.get(username)
login_user(user)

# 字典遍历

for key, value in six.iteritems(some_dict):
    pass

# Pandas 基本操作

# 列重命名
df.rename(columns={'name1': 'name2'}, inplace=True)
# 丢弃列
df = df.drop(columns=['name1'])
df.drop(columns=['name1'], inplace=True)
# 丢弃空行
df = df.dropna(subset=['name1', 'name2'])
# 丢弃重复行
df = df[['name1', 'name2']].drop_duplicates()
df = df.drop_duplicates(subset=['name1', 'name2'], keep=False)
df.drop_duplicates(subset=['name1', 'name2'], keep='first', inplace=True)
# 列操作
df['name1'] = df.groupby(['name2', 'name3'], as_index=False)['name4'].transform(lambda x: ','.join(x))
# 合并两个 df
df = pd.merge(df1, df2, how="left", on=["name1", "name2"])
# 过滤
df = df[['name1', 'name2', 'name3', 'name4']]
df = df[df.name1 == 'xxx']
df = df[df.name1 != 'xxx']
df = df[df.name1.str.contains('xxx')]
df = df[~df.name.isin(['Jalen', 'Bob', 'Kite'])]
df = df[df.timestamp > 6000]
df = df[df.name.notnull()]
df = df[df.name.isnull()]
df = df.groupby('name1').filter(lambda df: df.dependency.notnull().values.any())
df = df[(df.name.isin(['Jalen', 'Jones'])) & ~df.age.isin([12, 22, 32])]
df = df[df['name'].apply(lambda x: 'Jalen' in x)]
df = df[df.name.str.contains(f"(?i)Jalen")]
df = df[df.name.str.lower() == 'jalen']
# 填充 nan
df['name1'] = df['name1'].fillna('')
df.fillna(0, inplace=True)
df.fillna('', inplace=True)
df.fillna({'age1': 0, 'age2': 0, 'name': ''}, inplace=True)
# 变换
df['name1'] = df['name2'].apply(lambda x: 'yyy' if x=='xxx' else '')
df["name1"] = df["name2"].apply(lambda x: util.get_name(x))
df['name1'] = df.apply(lambda x: 'yyy' if x['name2']=='xxx' else '', axis=1)
# 追加多个 df
df = pd.concat([df1, df2], sort=False)
# 根据逗号拆分成多行
df.reset_index(drop=True, inplace=True)
df = df.drop('name', axis=1).join(df['name'].str.split(',', expand=True).stack().reset_index(level=1, drop=True).rename('name'))
# 赋值
df["name"] = df.name.str.upper()
# 获取行数
count = int(df.shape[0])
count = len(df)
# 分组聚合
df = df.groupby(by=['name'], as_index=False).size()
df = df.groupby(by=['name'], as_index=False)['value'].sum()
groups = df.groupby(['name1', 'name2', 'name3'], as_index=False)
for group in groups:
    pass
max_date = df[df.env == 'xxx']['start_date'].max()
df = df.sort_values('createtime', ascending=False).groupby(by=['name'], as_index=False).head(1)  # tail(1)
# 重置行索引
df.reset_index(drop=True, inplace=True)
# 时间列转换
df['create_time'] = pd.to_datetime(df['create_time'])
# 排序
df = df.sort_values(by="age", axis=1, ascending=False)
df.sort_values(by='similarity', ascending=False, inplace=True)
df.sort_values(by='time', axis=0, ascending=False, inplace=True)
df.sort_values(by='name', axis=0, ascending=False, inplace=True, na_position='first')
# 转秩
df = df.groupby([df['create_time'], df_m['name']], as_index=False).size().reset_index().fillna(0)
df = df.pivot_table(index='create_time', columns='name').reset_index().fillna(0)
df.columns = ['create_time' if x[0] == 'create_time' else x[1] for x in df.columns]
df.rename(index=str, columns={'create_time': 'Date'}, inplace=True)
# 发送 html 格式邮件
report_html = df_report.to_html(index=False, border=1, justify='center', na_rep="", escape=False)
report_html = report_html.replace('<th>', '<th style = "background-color: #e2fcfd; width:150px">')
report_html = report_html.replace('class', 'cellspacing=\"0\" class')
report_html = report_html.replace('<tbody>', '<tbody style="text-align:center">')
kwargs = {'title': title, 'receiver': receiver, 'body': body_header + normal_body + report_html + body_footer}
subject = "xxx"
util.send_email(subject, **kwargs)
# 读取 excel 表格
df = pd.read_excel(r"ddd.xlsx", engine='openpyxl', sheet_name='Sheet1')
df.to_excel("xxxx.xlsx", index=False, sheet_name='ddd')
# 读取 csv 文件
df = pd.read_csv(file_name)
df.to_csv(file_name, index=False)
# 取值
age = df.loc[df.name == 'Jalen', 'age'].values[0]
val = df.iloc[0]['value']
val = df['value'].iloc[0]
# df 转字典列表
df.to_dict(orient='records')
# 转 list
data = df1.append(df2).drop_duplicates(subset=['name', 'age'], keep=False)['name'].tolist()
# 逐行遍历 df
for index, row in df.iterrows():
  pass
# 拼接生成新列
df['name'] = df['name1'] + '-' + df['name2']

# 执行 Dig 命令

cmd = 'dig xxx -t CNAME +short'
process = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE)
out, err = process.communicate()

# xml 转 dict

xmltodict.parse(xml_str_content)

# 通过哨兵连接 redis

def get_redis_client():
    sentinel_ip_list = ['192.168.0.1', '192.168.0.2', '192.168.0.3']
    master_name = 'mymaster'
    master_password = 'xxx'
    sentinel_port = 26379
    conf = {
        'sentinel': [(ip, sentinel_port) for ip in sentinel_ip_list],
        'master_name': master_name,
        'connection_conf': {
            'socket_timeout': 5,
            'retry_on_timeout': True,
            'socket_keepalive': True,
            'max_connections': 10,
            'db': 0,
            'encoding': 'utf8',
            'decode_responses': True,
        }
    }
    redis_sentinel = Sentinel(conf['sentinel'], **conf['connection_conf'])
    # ip, port = redis_sentinel.discover_master(master_name)
    # logger.info('master ip: %s' % ip)
    # logger.info('master port: %s' % port)
    return redis_sentinel.master_for(master_name, password=master_password)

# 字符串转字典

data = eval(data_str)
data = json.loads(data_str)

# RedLock 使用

try:
    # Add lock to save
    lock_name = 'lock_%s' % number
    redis_client = redisutils.get_redis_client()
    save_lock = Redlock(key=lock_name, masters={redis_client}, auto_release_time=30 * 1000)
    save_lock.acquire(timeout=5)
    logger.info('Store data')
    save(body, redis_client)
    logger.info('Store end')
    save_lock.release()
except Exception as e:
    pass

# 事务操作

connection = engine.connect()
trans = connection.begin()
try:
  sql = "update tb set age=:age where name=:name"
  connection.execute(text(sql), name=name, age=age)
  trans.commit()
except:
    trans.rollback()
    raise
finally:
    connection.close()

# KB 查询

query_body = {"query": {'multi_match': {
        'query': 'query_string',
        'fields': ['f1', 'f2', 'f3', 'f4']
    }}, 'size': 20}
query = es.search(index=index, body=query_body)
res = [q['_source'] for q in query['hits']['hits']]

# ISO8859 数据库存非西欧字符

# 存的时候
model.value = val.encode("unicode_escape").decode("utf-8") if val else ''
# 取的时候
df['value'] = df['value'].apply(lambda x: x.encode('ISO-8859-1').decode('unicode_escape') if x else '')

# KB 主页展示表格转树状

data = defaultdict(list)

# 字典拼接

data = {'name': 'Jalen', 'age': 12}
data1 = dict(data, **{'weight': 120, 'money': 'no'})
data.update({'fat': False, 'nickname': 'cheng'})

# 批量插入

df = df[['name1', 'name2', 'name3', 'name4']]
data_to_insert = [tuple(xi) for xi in df.values]
connection = engine.raw_connection()
cursor = connection.cursor()
cursor.executemany("insert into tb01 values (:1, :2, :3, :4)", data_to_insert)
cursor.close()
connection.commit()

# JSON 数据排序

def json_ordered(obj):
    """ ref: https://stackoverflow.com/questions/25851183/how-to-compare-two-json-
     objects-with-the-same-elements-in-a-different-order-equa """
    if isinstance(obj, dict):
        return sorted((k, json_ordered(v)) for k, v in obj.items())
    if isinstance(obj, list):
        return sorted(json_ordered(x) for x in obj)
    else:
        return obj

# 多层字典转为一层字典,key 用点号分隔

def split_dict_keys_with_point(tmp_dict, top_key_list):
    collection = []
    for k, v in tmp_dict.items():
        if type(v) != dict:
            if not top_key_list:
                collection.append({k: v})
            else:
                collection.append({f"{'.'.join(top_key_list)}.{k}": v})
        else:
            top_key_list.append(k)
            stack_res = split_dict_keys_with_point(v, top_key_list)
            top_key_list.pop()
            collection.extend(stack_res)
    return collection
if __name__ == '__main__':
    tmp_dict = {
        'index': 'some_index',
        'timestamp': '2022-12-12 00:00:00',
        'metrics': {
            'name': 'jalen',
            'age': 12,
            'love': 'eat',
            'properties': {
                'address': 'HeFei',
                'phone': '1888888888'
            }
        }
    }
    data = util.split_dict_keys_with_point(tmp_dict, [])

# 格式化字符串

if __name__ == '__main__':
    data_list = [{
        'name': 'Jalen',
        'age': 12,
        'weight': 120,
        'address': 'HeFei'
    }, {
        'name': 'Jones',
        'age': 12,
        'weight': 120,
        'address': 'HeFei'
    }, {
        'name': 'Bob',
        'age': 12,
        'weight': 120,
        'address': 'HeiHe'
    }]
    df = pd.DataFrame(data_list)
    fields = list(df.columns.values)
    reports = df.values.tolist()
    fields_length = {}
    for i in fields:
        msg_col = df[i].tolist()
        col_width = []
        for j in msg_col:
            col_width.append(len(str(j)))
        fields_length[i] = max(col_width) if max(col_width) > 20 else 20
    markdown = ''
    report_length = len(reports)
    lens, title = [], []
    for field in fields:
        title.append(("{:<%d}" % fields_length[field]).format(field))
        lens.append(fields_length[field])
    split_line = "+%s+" % '+'.join(["-" * i for i in lens])
    title = "|%s|" % "|".join(title)
    keep_split_line = 1
    head_msg = "%s  \n%s  \n%s  \n" % (split_line, title, split_line)
    msg = "\n%s" % head_msg
    for count, report in enumerate(reports):
        row = "|%s|" % "|".join([("{:<%d}" % lens[i]).format(str(item)) for i, item in enumerate(report)])
        msg = msg + row + "  \n" + split_line + "  \n" if keep_split_line else msg + row + "  \n"
        if report_length == count + 1:
            print(msg)
    print('End')
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Jalen Chu 微信支付

微信支付

Jalen Chu 支付宝

支付宝

Jalen Chu 公众号

公众号