import grpc import control_pb2_grpc import control_pb2 import psycopg2 import os import sys class admL1Api(): def __init__(self): self.body = "" def adminLevelOne(self,body): #获取obsid obsid=int(str(body)[0]) #获取body # body=body.split(",",1)[1] shost = os.getenv('CSST_SCALEBOX_HOST') sport = os.getenv('CSST_SCALEBOX_PORT') suser = os.getenv('CSST_SCALEBOX_USER') spwd = os.getenv('CSST_SCALEBOX_PWD') sdb = os.getenv('CSST_SCALEBOX_DATABASE') #取环境变量中模块id,需预先设定 #current_job_id=os.getenv('CSST_ADML1_APPID') # conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox") conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb) cursor = conn.cursor() #sql ="SELECT id,name FROM t_app ;" #根据admL1的名字查找它的jobid admsql = "SELECT id FROM t_job where name = 'admL1'" cursor.execute(admsql) admrows = cursor.fetchone() current_job_id = admrows[0] sink_job_name="" if obsid==1: admL1Api.sum_numbers(body,"mbi",current_job_id) admL1Api.sum_numbers(body,"sls2d",current_job_id) elif obsid==2: sink_job_name="mci" elif obsid==3: sink_job_name="ifs" elif obsid==4: sink_job_name="cpic" elif obsid==5: sink_job_name="hstdm" else: sink_job_name="" if sink_job_name: admL1Api.sum_numbers(body,sink_job_name,current_job_id) else: print('等待模块传输') print("执行完毕") @classmethod def sum_numbers(self,body,sink_job_name,current_job_id): #调用grpc的SendJobMessage channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) stub = control_pb2_grpc.ControlServiceStub(channel) test = control_pb2.JobKey() #当前模块的id test.builtin_job_id.current_job_id = int(current_job_id) #下级模块的名字 test.builtin_job_id.sink_job_name =sink_job_name test.key_text = body reflag = stub.SendJobMessage(test) print("rowname : %s" %(sink_job_name)) print("reflag : %d " %(reflag.value)) return reflag.value if __name__ == '__main__': parameter = sys.argv body=parameter[1] w=admL1Api() w.adminLevelOne(body)