import grpc import control_pb2_grpc import control_pb2 import os class PyScaleboxApi(): def __init__(self): self.body = "" self.a = 0 #2级流水线接口,body:brickid,name1,name2,name3 def send_message_L2(self,body): channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) stub = control_pb2_grpc.ControlServiceStub(channel) test = control_pb2.JobKey() test.cross_app_job_id = int(os.getenv('CSST_ADML2_JOB_ID')) test.key_text = body reflag = stub.SendJobMessage(test) print(reflag.value) return reflag.value #1级流水线接口,body:obsid或曝光id,moduleid def send_message_L1(self,body): channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) stub = control_pb2_grpc.ControlServiceStub(channel) test = control_pb2.JobKey() test.cross_app_job_id = int(os.getenv('CSST_ADML1_JOB_ID')) print("test.cross_app_job_id = "+os.getenv('CSST_ADML1_JOB_ID')) test.key_text = body self.body = body reflag = stub.SendJobMessage(test) print(reflag.value) return reflag.value