import grpc import control_pb2_grpc import control_pb2 import psycopg2 import os import sys #递归调用 class admL1Api(): def __init__(self): self.body = "" def adminLevelOne(self,body): #获取obsid obsid=body.split(",",1)[0] #获取moduleid moduleid=body.split(",",1)[1] #根据moduleid获取模块内部jobid shost = os.getenv('CSST_SCALEBOX_HOST') sport = os.getenv('CSST_SCALEBOX_PORT') suser = os.getenv('CSST_SCALEBOX_USER') spwd = os.getenv('CSST_SCALEBOX_PWD') sdb = os.getenv('CSST_SCALEBOX_DATABASE') # conn = psycopg2.connect(host="10.255.2.11",port=5432,user="scalebox",password="changeme",database="scalebox") conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb) cursor = conn.cursor() #获取流水线id app_id=int(os.getenv('CSST_ADML1_APPID')) sql ="SELECT id FROM t_job where name = %s and app= %s;" params = (moduleid,app_id,) cursor.execute(sql,params) rows = cursor.fetchone() jobId=rows[0] conn.commit() cursor.close() conn.close() #调用grpc的SendJobMessage channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) stub = control_pb2_grpc.ControlServiceStub(channel) test = control_pb2.JobKey() test.cross_app_job_id = int(jobId) test.key_text = obsid reflag = stub.SendJobMessage(test) print(reflag.value) return reflag.value if __name__ == '__main__': parameter = sys.argv body=parameter[1] w=admL1Api() w.adminLevelOne(body)