通过Repoll redis管理平台从申请、审批到集群配置上线的全过程讲解。在过程讲解中本文将附带对平台源码进行讲解。一面看redis、一面看django和python多方面一起学习!
GitHub传送门:https://github.com/NaNShaner/repoll
Redis管理平台Repoll
平台对于redis集群的申请颇为简单,输入如下图所示的各项字段,点击确认即可。报错后平台将发起审批,给到领导层进行审批。
Redis 申请页面
配套的代码讲解
实例的申请使用的是django的admin功能,未加修饰。直接上代码,功能见注释。
class ApplyRedisInfoAdmin(admin.ModelAdmin):
def get_queryset(self, request):
"""函数作用:使当前登录的用户只能看到自己负责的实例"""
qs = super(ApplyRedisInfoAdmin, self).get_queryset(request)
result = qs.filter(create_user=request.user)
if request.user.is_superuser:
return qs
return result
def save_model(self, request, obj, form, change):
"""
隐藏前端页面申请用户字段,后台自动添加用户入库
:param request: 当前wsgi
:param obj:
:param form:
:param change:
:return:
"""
obj.create_user = request.user
super().save_model(request, obj, form, change)
list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
'redis_mem', 'sys_author', 'area',
'pub_date', 'apply_status']
list_filter = ['redis_type']
search_fields = ['area']
# 不可见字段
exclude = ['create_user']
list_per_page = 15
readonly_fields = ['apply_status', ]
平台对实例的审批也是十分简单,只需勾选需要申请的实例,点击“批准”或者“拒绝”即可。
Redis 审批页面
配套的代码讲解
领导当然也可以批量勾选,一次性审批多条
class RedisApplyAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
"""
禁用添加按钮
:param request:
:return:
"""
return False
def has_delete_permission(self, request, obj=None):
"""
禁用删除按钮
:param request:
:param obj:
:return:
"""
return False
def get_actions(self, request):
"""
在actions中去掉‘删除’操作
:param request:
:return:
"""
actions = super(RedisApplyAdmin, self).get_actions(request)
if request.user.username[0].upper() != 'J':
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
def get_queryset(self, request):
"""函数作用:使当前登录的用户只能看到自己负责的实例"""
qs = super(RedisApplyAdmin, self).get_queryset(request)
if request.user.is_superuser:
return qs
return qs.filter(create_user=RedisApply.objects.filter(create_user=request.user))
list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
'redis_mem', 'sys_author', 'area',
'pub_date', 'create_user', 'apply_status']
list_filter = ['redis_type']
search_fields = ['area']
list_per_page = 15
actions = ['approve_selected_new_assets', 'deny_selected_new_assets']
def approve_selected_new_assets(self, request, queryset):
"""
用于在申请redis的界面添加一个审批通过按钮
:param request: Http Request实例
:param queryset: 勾选实例名称
:return:
"""
# 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
success_upline_number = 0
try:
for asset_id in selected:
obj = ApproveRedis(request, asset_id)
create_redis_ins = obj.create_asset()
if create_redis_ins:
success_upline_number += 1
self.message_user(request, "成功批准 %s 个新Redis实例上线!" % success_upline_number)
obj.redis_apply_status_update(statu=3)
else:
self.message_user(request, "实例为 {0} 的实例上线失败,已存在上线实例,请检查".format(obj.redis_ins_name))
except ValueError as e:
self.message_user(request, "实例为 {0} 的实例上线失败,原因为{1}".format(queryset, e))
approve_selected_new_assets.short_description = "批准选择的Redis实例"
def deny_selected_new_assets(self, request, queryset):
"""
用于在申请redis的界面添加一个审批拒绝按钮
:param request: Http Request实例
:param queryset: 勾选实例名称
:return:
"""
# 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
deny_upline_number = 0
try:
for asset_id in selected:
obj = ApproveRedis(request, asset_id)
deny_redis_ins = obj.deny_create()
if deny_redis_ins:
deny_upline_number += 1
obj.redis_apply_status_update(statu=4)
self.message_user(request, "已拒绝 %s 个新Redis实例上线!" % deny_upline_number)
else:
self.message_user(request, "操作实例为 {0} 的实例失败,已存在上线实例,请检查".format(obj.redis_ins_name))
except ValueError as e:
self.message_user(request, "操作实例为 {0} 的实例失败,原因为{1}".format(queryset, e))
deny_selected_new_assets.short_description = "拒绝选择的Redis实例"
在上面的代码中,如下一句即更新实例审批状态为“已审批”
obj.redis_apply_status_update(statu=3)
如下一句即更新实例审批状态为“已拒绝”
obj.redis_apply_status_update(statu=4)
审批后的页面
平台对于redis实例配置上线的页面是给DBA或者运维的同学准备中,对于用户的申请和领导的审批状态都是展现出来,详如下图。在实例详情的文本框中按照固定的格式输入,点击保存,即完成上线配置咯。
这块的逻辑比较复杂,代码较多,各位花点时间:
逻辑实现,使用了django的信号触发,有兴趣了解django的信号功能,可以详见django的官网
@receiver(post_save, sender=ApplyRedisText)
def apply_redis_text_handler(sender, **kwargs):
"""
触发器,前端页面在审批完后自动触发
:param sender:
:param kwargs:
:return:
"""
a = kwargs
redis_ins_id = kwargs['instance'].redis_ins_id
redis_ins_obj = RedisIns.objects.filter(id=redis_ins_id)
redis_ins_type = redis_ins_obj.values('redis_type').first()['redis_type']
redis_text = kwargs['instance'].apply_text
redis_apply_text_split = redis_apply_text(redis_text, redis_type=redis_ins_type)
redis_ins_obj_name = redis_ins_obj.values('redis_ins_name').first()
redis_ins_obj_mem = redis_ins_obj.values('redis_mem').first()
if redis_ins_type == 'Redis-Standalone':
redis_ip = redis_apply_text_split['redis_ip']
redis_port = redis_apply_text_split['redis_port']
a = RedisStandalone(redis_ins=redis_ins_obj,
redis_ins_name=redis_ins_obj_name,
redis_ins_type=redis_ins_type,
redis_ins_mem=redis_apply_text_split['redis_mem'],
redis_ip=redis_ip,
redis_port=redis_port)
a.saved_redis_running_ins()
if a.create_redis_conf_file():
redis_start = RedisStartClass(host=redis_ip,
redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" + str(redis_port) + ".conf")
if redis_start.start_server():
logging.info("Redis 单实例启动成功,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
else:
logging.info("Redis 单实例启动失败,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
elif redis_ins_type == 'Redis-Sentinel':
b = RedisModelStartClass(model_type='Redis-Sentinel',
redis_ins=redis_ins_obj,
redis_master_ip_port=redis_apply_text_split['redis_master_ip_port'],
redis_slave_ip_port=redis_apply_text_split['redis_slave_ip_port'],
redis_master_name=redis_apply_text_split['redis_master_name'],
redis_sentinel_ip_port=redis_apply_text_split['redis_sentinel_ip_port'],
redis_sentinel_num=redis_apply_text_split['redis_sentinel_num'],
sentinel_down_after_milliseconds=30000,
sentinel_failover_timeout=180000,
sentinel_parallel_syncs=1,
redis_mem=redis_apply_text_split['redis_mem'])
create_sentinel_conf_file = b.create_sentienl_conf_file()
create_master_slave_file = b.create_maser_slave_conf()
if create_sentinel_conf_file and create_master_slave_file:
start_master = b.start_redis_master()
if start_master:
start_slave = b.start_slave_master()
if start_slave:
b.start_sentinel_master()
logging.info("哨兵模式启动成功,redis_master_ip_port:{0},"
"redis_slave_ip_port:{1},"
"redis_sentinel_ip_port:{2},"
"redis_master_name:{3}".format(redis_apply_text_split['redis_master_ip_port'],
redis_apply_text_split['redis_slave_ip_port'],
redis_apply_text_split['redis_sentinel_ip_port'],
redis_apply_text_split['redis_master_name']))
b.save_sentinel_redis_ins()
elif redis_ins_type == 'Redis-Cluster':
redis_list = []
redis_cluster_mem_sum = 0
for redis_one_ins in redis_apply_text_split:
redis_one_ins_split = {"redis_master": redis_one_ins['redis_ip_port'][0],
"redis_slave": redis_one_ins['redis_ip_port'][1:],
"redis_mem": redis_one_ins['redis_mem']}
redis_cluster_mem_sum += int(redis_one_ins['redis_mem'])
redis_list.append(redis_one_ins_split)
obj_runningins = RunningInsTime(running_ins_name=redis_ins_obj_name["redis_ins_name"],
redis_type='Redis-Cluster',
redis_ins_mem=redis_cluster_mem_sum,
ins_status=0)
obj_runningins.save()
for redis_one_ins in redis_apply_text_split:
for all_redis_ins in redis_one_ins['redis_ip_port']:
if redis_one_ins['redis_ip_port'].index(all_redis_ins) == 0:
c = RedisClusterClass(redis_ins=redis_ins_obj,
redis_ins_name=redis_ins_obj_name,
redis_ins_type="Redis-Master",
redis_ins_mem=redis_one_ins['redis_mem'],
redis_ip=all_redis_ins[0],
redis_port=all_redis_ins[1])
file_status = c.create_cluster_file()
if file_status:
c.start_all_redis_ins()
c.save_cluster_ins()
else:
c = RedisClusterClass(redis_ins=redis_ins_obj,
redis_ins_name=redis_ins_obj_name,
redis_ins_type="Redis-Slave",
redis_ins_mem=redis_one_ins['redis_mem'],
redis_ip=all_redis_ins[0],
redis_port=all_redis_ins[1])
file_status = c.create_cluster_file()
if file_status:
c.start_all_redis_ins()
c.save_cluster_ins()
RedisIns.objects.filter(redis_ins_name=redis_ins_obj_name["redis_ins_name"]).update(on_line_status=0)
start_cluster = StartRedisCluster(cluster_list=redis_list)
redis_cluster_list = start_cluster.redis_cluster_list()
start_cluster.redis_cluser_meet(redis_cluster_list)
redis_cluster_node_info = start_cluster.get_cluster_info()
if redis_cluster_node_info:
start_cluster.add_slot_2_master(redis_cluster_node_info)
else:
logging.error("redis cluster 启动失败,集群信息为{0}".format(redis_list))
按照输入的master节点,完成redis主从关系的确立
class RedisClusterClass:
def __init__(self, redis_ins, redis_ins_name, redis_ins_type, redis_ins_mem, redis_ip, redis_port):
"""
单个redis的实例进行配置文件生成、配置文件分发、进程启动
:param redis_ins:
:param redis_ins_name:
:param redis_ins_type:
:param redis_ins_mem:
:param redis_ip:
:param redis_port:
"""
self.redis_ins = [r.__dict__ for r in redis_ins]
self.redis_ins_name = redis_ins_name['redis_ins_name']
self.redis_ins_type = redis_ins_type
self.redis_ins_mem = redis_ins_mem
self.redis_ip = redis_ip
self.redis_port = redis_port
def create_cluster_file(self):
"""
创建redis实例的配置文件,并分发到资源池服务器指定的目录下。分发文件支持ssh免密和用户密码校验
使用用户密码校验目前是硬编码,后续优化支持读库
:return:
"""
redis_conf = get_redis_conf("Redis-Standalone")
redis_cluster_conf = get_redis_conf("Redis-Cluster")
all_redis_conf = [conf_k_v.__dict__ for conf_k_v in redis_conf]
all_cluster_conf = [conf_k_v.__dict__ for conf_k_v in redis_cluster_conf]
conf_file_name = "{0}/templates/".format(TEMPLATES_DIR) + str(self.redis_port) + "-cluser.conf"
with open(conf_file_name, 'w+') as f:
for k, v in all_redis_conf[0].items():
if k != 'id' and k != 'redis_version' and k != 'redis_type':
if isinstance(v, str) or isinstance(v, int):
k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
maxmemory=mem_unit_chage(self.redis_ins_mem))
f.write(k + " " + str(v) + "\\n")
for k, v in all_cluster_conf[0].items():
if k != 'id' and k != 'redis_version' and k != 'redis_type':
if isinstance(v, str) or isinstance(v, int):
k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
maxmemory=mem_unit_chage(self.redis_ins_mem),
kwargs={"redis_port": self.redis_port})
f.write(k + " " + str(v) + "\\n")
if do_scp(self.redis_ip, conf_file_name, "/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf",
user_name="root", user_password="Pass@word"):
logging.info("目标服务器{0}文件分发成功".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
else:
logging.error("目标服务器{0}文件分发失败".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
return False
return True
def start_all_redis_ins(self):
redis_start = RedisStartClass(host=self.redis_ip,
redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" +
str(self.redis_port) + "-cluster.conf")
if redis_start.start_server():
logging.info("redis 实例{2}启动成功,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
return True
else:
logging.info("redis 实例{2}启动失败,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
return False
def save_cluster_ins(self):
obj_runningins_now = RunningInsTime.objects.all().get(running_ins_name=self.redis_ins_name)
obj = RunningInsCluster(
running_ins_name=self.redis_ins_name,
redis_type=self.redis_ins_type,
running_ins_port=self.redis_port,
redis_ip=self.redis_ip,
redis_ins_mem=self.redis_ins_mem,
running_ins_standalone=obj_runningins_now
)
obj.save()
class StartRedisCluster:
def __init__(self, cluster_list):
"""
启动Redis Cluster
:param cluster_list: 所有redis实例的IP及PORT
"""
if isinstance(cluster_list, list):
self.cluster_list = cluster_list
def redis_cluster_list(self):
"""
格式化所有redis实例的IP及PORT
:return: list
"""
redis_list = []
for redis_one_list in self.cluster_list:
redis_list.append(redis_one_list['redis_master'])
for redis_slave in redis_one_list['redis_slave']:
redis_list.append(redis_slave)
return redis_list
def redis_cluser_meet(self, redis_ins_list):
"""
redis集群内所有节点完成cluster meet
:param redis_ins_list: 入参为格式化后的集群节点信息
:return:
"""
if isinstance(redis_ins_list, list):
redis_ins_list_copy = copy.deepcopy(redis_ins_list)
i = 0
try:
if i < len(redis_ins_list_copy):
i += 1
redis_ins_one = redis_ins_list_copy.pop(0)
redis_ins_list_copy.append(redis_ins_one)
redis_ins_one_ip = redis_ins_one[0]
redis_ins_one_port = redis_ins_one[1]
for redis_ins_one_by_one in redis_ins_list_copy:
redis_ins_one_by_one_ip = redis_ins_one_by_one[0]
redis_ins_one_port_port = redis_ins_one_by_one[1]
comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster meet {2} {3}".format(
redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port
)
if do_command(host=redis_ins_one_ip, commands=comm_line, user_name="root", user_password="Pass@word"):
logging.info("{0}:{1} cluster meet {2}:{3} is ok".format(
redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port))
except Exception as e:
logging.error("Redis Cluster 启动失败,涉及节点为{0},报错信息为{1}".format(self.cluster_list, e))
def get_cluster_info(self):
"""
获取cluster nodes信息,用于后续的主从关系添加
:return: 所有节点的node id及ip对应关系
"""
node_dict = {}
redis_cluster_ip_port = self.cluster_list[0]
_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster nodes ".format(redis_cluster_ip_port['redis_master'][0], redis_cluster_ip_port['redis_master'][1])
_comm_result = do_command(host=redis_cluster_ip_port['redis_master'][0], commands=_comm_line, user_name="root", user_password="Pass@word")
if _comm_result[0] == 0:
_comm_result_list = _comm_result[1].decode('unicode-escape')
_comm_result_list = _comm_result_list.split("\\n")
node_dict = {one_line.split()[1]: one_line.split()[0] for one_line in _comm_result_list if one_line}
if node_dict:
return node_dict
else:
return None
def add_slot_2_master(self, cluster_node_info):
"""
给主节点添加slot,并完成主从节点关系
:param cluster_node_info: 入参为所有节点的node id和ip对应关系
:return:
"""
_slots = ["{0..5461}", "{5462..10922}", "{10923..16383}"]
num = 0
try:
for redis_ip_port in self.cluster_list:
redis_master_ip = redis_ip_port['redis_master']
_add_slot_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster addslots {2}".format(redis_master_ip[0],
redis_master_ip[1],
_slots[num])
if do_command(host=redis_master_ip[0], commands=_add_slot_comm_line, user_name="root", user_password="Pass@word"):
logging.info("add slot 成功")
for redis_slave_ip in redis_ip_port['redis_slave']:
_add_master_replca_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster replicate {2}".format(
redis_slave_ip[0],
redis_slave_ip[1],
cluster_node_info['{0}:{1}'.format(redis_master_ip[0], redis_master_ip[1])]
)
logging.info(_add_master_replca_comm_line)
if do_command(host=redis_slave_ip[0], commands=_add_master_replca_comm_line, user_name="root", user_password="Pass@word"):
logging.info("add replicate 成功")
num += 1
except IOError as e:
return False
return True
对于已运行的实例,平台目前提供启动、停止、以及qps的趋势图功能。平台的起、停功能由django rest_framework框架完成,框架中已完成了API的安全限制,非平台用户无法调用。
Redis已运行实例
Redis集群信息
redis的qps功能,目前使用的是django_crontab,尚有不完美的地方,这块代码还在优化中就不贴出来了。
如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!
赞3
添加新评论0 条评论