import grpc import control_pb2_grpc import control_pb2 import psycopg2 import os import sys class admL1Api(): def __init__(self): self.body = "" def adminLevelOne(self,body): #获取obsid obsid=int(str(body)[0]) #获取body # body=body.split(",",1)[1] shost = os.getenv('CSST_SCALEBOX_HOST') sport = os.getenv('CSST_SCALEBOX_PORT') suser = os.getenv('CSST_SCALEBOX_USER') spwd = os.getenv('CSST_SCALEBOX_PWD') sdb = os.getenv('CSST_SCALEBOX_DATABASE') admL1Api.sum_numbers(body,obsid,shost,sport,suser,spwd,sdb) print("执行完毕") @classmethod def sum_numbers(self,body,obsid,shost,sport,suser,spwd,sdb): #取环境变量中模块id,需预先设定 #current_job_id=os.getenv('CSST_ADML1_APPID') #调用grpc的SendJobMessage channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) stub = control_pb2_grpc.ControlServiceStub(channel) test = control_pb2.JobKey() # conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox") conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb) cursor = conn.cursor() current_job_id=os.getenv('JOB_ID') sql ="SELECT job_name FROM t_obs where obs_x ='{}' ;".format(obsid) cursor.execute(sql) rows = cursor.fetchall() for rowname in rows: #当前模块的id test.builtin_job_id.current_job_id = int(current_job_id) #下级模块的名字 test.builtin_job_id.sink_job_name = rowname[0] test.key_text = body reflag = stub.SendJobMessage(test) print("rowname : %s" %(rowname[0])) print("reflag : %d " %(reflag.value)) conn.commit() cursor.close() conn.close() return "执行成功" if __name__ == '__main__': parameter = sys.argv body=parameter[1] w=admL1Api() w.adminLevelOne(body)