"L1/dockerfiles/csst-ifs-l1-cube-adm/test/add_task.sh" did not exist on "a8f310b1f5013e9f2d6d5c0f9a60276e31f7abd9"
Commit 6f511e73 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

Change the text format to LF

parent e06f46dc
FROM hub.cstcloud.cn/scalebox/agent:dev FROM hub.cstcloud.cn/scalebox/agent:0.9
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>" LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
......
IMAGE_NAME:=hub.cstcloud.cn/csst/adml1:dev IMAGE_NAME:=csst/adml1
build: build:
docker build --network=host -t $(IMAGE_NAME) . docker build --network=host -t $(IMAGE_NAME) .
...@@ -12,3 +12,5 @@ run: ...@@ -12,3 +12,5 @@ run:
docker run -it --entrypoint bash $(IMAGE_NAME) docker run -it --entrypoint bash $(IMAGE_NAME)
down: down:
docker stop $(IMAGE_NAME) docker stop $(IMAGE_NAME)
scp:
scp -r ./ csst-zjs:/root/csst/admL1/
\ No newline at end of file
...@@ -40,7 +40,7 @@ class admL1Api(): ...@@ -40,7 +40,7 @@ class admL1Api():
test = control_pb2.JobKey() test = control_pb2.JobKey()
test.cross_app_job_id = int(jobId) test.cross_app_job_id = int(jobId)
test.key_text = obsid test.key_text = obsid
reflag = stub.SendJobMessage(test) reflag = stub.SendMessage(test)
print(reflag.value) print(reflag.value)
return reflag.value return reflag.value
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -11,98 +11,95 @@ import "google/protobuf/timestamp.proto"; ...@@ -11,98 +11,95 @@ import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto"; import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto"; import "google/protobuf/wrappers.proto";
service ControlService { service ControlService {
// ////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////
// agent client // actuator client
// ////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////
// parameter : inline_cluster_name
rpc GetInlineSlotList(google.protobuf.StringValue) returns (InlineSlotList);
// input : slot id('ON') // parameter : external_cluster_name
// return : job-key of the task, id==0(NULL) rpc GetExternalSlotList(google.protobuf.StringValue) returns (ExternalSlotList);
// task id : 'READY/-1' -> 'QUEUED/-2'
rpc GetNextTask(google.protobuf.Int32Value) returns (TaskItem); // 'slot_run' only for the inline cluster,called by actuator
// 'agent_setup'/'agent_teardown', called by agent
// input : task id('QUEUED/-2') // rpc SetSlotRunning(SlotExecMessage) returns (google.protobuf.Empty);
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(WRONG STATUS) rpc SaveClientExecInfo(ClientExecMessage) returns (google.protobuf.Empty);
// task_status_code:'QUEUED'/-2 -> 'RUNNING'/-3
rpc SetTaskStarted(google.protobuf.Int64Value) returns (google.protobuf.Int32Value);
// input : TaskExecMessage
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(task WRONG STATUS)
// task_status_code:'RUNNING'/-3 -> 'OK'/0, ...
rpc SetTaskFinished(TaskExecMessage) returns (google.protobuf.Int32Value);
// slot exit automatically, called in agent side
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot : 'ON' -> 'OFF'
rpc SetSlotTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// ////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////
// actuator client // agent client
// ////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////
// parameter : slot_id
// input: token_string ?
// job:'RUNNING' && slot:'READY'
rpc GetRunnableSlotList(google.protobuf.Empty) returns (CommandList);
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot: 'READY' -> 'ON' // slot: 'READY' -> 'ON'
rpc SetSlotInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value); rpc SetSlotInitialized(google.protobuf.Int32Value) returns (google.protobuf.Empty);
// job:'PAUSED'/'ARCHIVED' && slot:'ON'
rpc GetTerminableSlotList(google.protobuf.Empty) returns (CommandList);
// slot exit automatically, slot : 'ON' -> 'OFF'/'READY'
// parameter : slot_id
rpc SetSlotTerminated(google.protobuf.Int32Value) returns (google.protobuf.Empty);;
// job:'RUNNING' && worker:'PAUSED' // parameter : job_id/host_ip, sep=','
rpc GetRunnableWorkerList(google.protobuf.Empty) returns (CommandList); // return : slot id
rpc RegisterSlot(google.protobuf.StringValue) returns (google.protobuf.Int32Value);
// parameter : slot_id
rpc DeregisterSlot(google.protobuf.Int32Value) returns (google.protobuf.Empty);
// input : worker id // task id : 'READY/-1' -> 'QUEUED/-2'
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS) // parameter : slot_id('ON')
// worker: 'PAUSED' -> 'RUNNING' // return : job-key of the task, id==0(NULL)
rpc SetWorkerInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value); rpc GetNextTask(google.protobuf.Int32Value) returns (TaskItem);
// job:'PAUSED' && worker:'RUNNING' // task_status_code:'QUEUED'/-2 -> 'RUNNING'/-3
rpc GetTerminableWorkerList(google.protobuf.Empty) returns (CommandList); // parameter : task_id('QUEUED/-2')
rpc SetTaskStarted(google.protobuf.Int64Value) returns (google.protobuf.Empty);
// input : worker id, return ret_code; // task_status_code:'RUNNING'/-3 -> 'OK'/0, ...
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS) // parameter : TaskExecMessage
// worker : 'RUNNING' -> 'PAUSED' rpc SetTaskFinished(TaskExecMessage) returns (google.protobuf.Empty);
rpc SetWorkerTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// ////////////////////////////////////////////////////////////////// // input : slot id
// app client , called by user app. // return : ret_code < 0 exit
// ////////////////////////////////////////////////////////////////// rpc CheckSlotHeartbeat(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// send task-key to next job in current pipeline // send job-key to next job in current pipeline
// return : ret_code : 0(OK), -1(job NOT FOUND) // return : task_id(OK), <0 (error)
// task_status_code : 'INITIAL'/-9 // task_status_code : 'INITIAL'/-9
rpc SendJobMessage(JobKey) returns (google.protobuf.Int32Value); rpc SendMessage(JobKey) returns (google.protobuf.Int64Value);
// rpc SendToNextJob(JobKey) returns (google.protobuf.Int32Value);
// send 'START' message to head job, and set its status to running
rpc SendJobMessages(JobKeys) returns (google.protobuf.Int32Value); // rpc SetAppRunning(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
} }
message JobKey { message JobKey {
message JobIdRef{ message JobIdRef{
// qualified name of job // qualified name of job
string sink_job_name=1; string sink_job_name=1;
// for sink-job in the same app
int32 current_job_id=2; int32 current_job_id=2;
} }
message JobIdAppRef{
// qualified name of job
string sink_job_name=1;
// for sink-job in remote server (app id)
int32 app_id=2;
// <ip-addr:port> for controld/grpc-server
string remote_server=3;
}
oneof jobId { oneof jobId {
int32 cross_app_job_id=1; // for the same app
JobIdRef builtin_job_id=2; JobIdRef builtin_job_id=1;
// for different app in the same cluster
int32 cross_app_job_id=2;
// for cross-cluster app
JobIdAppRef cross_server_job_id=3;
} }
// 1. multi-messages : comma-sep
// 2. customized sep : json-format
// label: "multi-messages", text: "abc:134:345ß"
string key_text=10; string key_text=10;
}
message JobKeys { bool async_task_creation=20;
// qualified name of job map<string, string> headers = 21;
string next_job_name=1;
repeated string key_texts=2;
int32 current_job_id=3;
// OR worker_id ?
} }
message TaskItem { message TaskItem {
...@@ -110,17 +107,30 @@ message TaskItem { ...@@ -110,17 +107,30 @@ message TaskItem {
string key=2; string key=2;
} }
message Command { message InlineSlotInfo {
// primary key of slot/worker table // primary key of slot table
int32 id=1; int32 id=1;
string host=2; string host=2;
string command_text=3; int32 port=3;
string uname=4;
string command_text=5;
}
message InlineSlotList {
repeated InlineSlotInfo slots=1;
} }
message CommandList { message ExternalSlotList {
repeated Command command=1; repeated ExternalSlotInfo slots=1;
}
message ExternalSlotInfo {
int32 job_id=1;
string command_text=2;
int32 num_slots=3;
map<string,string> resource_req=4;
} }
message TaskExecMessage { message TaskExecMessage {
int32 slot=1; int32 slot=1;
int32 status_code=2; int32 status_code=2;
...@@ -132,7 +142,23 @@ message TaskExecMessage { ...@@ -132,7 +142,23 @@ message TaskExecMessage {
google.protobuf.Timestamp t2=8; google.protobuf.Timestamp t2=8;
google.protobuf.Timestamp t3=9; google.protobuf.Timestamp t3=9;
google.protobuf.Timestamp t4=10; google.protobuf.Timestamp t4=10;
repeated google.protobuf.Timestamp time_arr=11; repeated google.protobuf.Timestamp tc_arr=11;
string sys_out=12; string stdout=12;
string app_out=13; string stderr=13;
string userText=14;
}
message ClientExecMessage {
// message SlotExecMessage {
int32 slot=1;
// return code
int32 code=2;
string stdout=3;
string stderr=4;
// client start time
google.protobuf.Timestamp tc0=5;
// client end time
google.protobuf.Timestamp tc1=6;
// 'slot_run', 'agent_setup', 'agent_teardown'
string action=7;
} }
...@@ -19,42 +19,57 @@ class admL1Api(): ...@@ -19,42 +19,57 @@ class admL1Api():
suser = os.getenv('CSST_SCALEBOX_USER') suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD') spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE') sdb = os.getenv('CSST_SCALEBOX_DATABASE')
admL1Api.sum_numbers(body,obsid,shost,sport,suser,spwd,sdb) #取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
cursor = conn.cursor()
#sql ="SELECT id,name FROM t_app ;"
#根据admL1的名字查找它的jobid
admsql = "SELECT id FROM t_job where name = 'admL1'"
cursor.execute(admsql)
admrows = cursor.fetchone()
current_job_id = admrows[0]
sink_job_name=""
if obsid==1:
admL1Api.sum_numbers(body,"mbi",current_job_id)
admL1Api.sum_numbers(body,"sls2d",current_job_id)
elif obsid==2:
sink_job_name="mci"
elif obsid==3:
sink_job_name="ifs"
elif obsid==4:
sink_job_name="cpic"
elif obsid==5:
sink_job_name="hstdm"
else:
sink_job_name=""
if sink_job_name:
admL1Api.sum_numbers(body,sink_job_name,current_job_id)
else:
print('等待模块传输')
print("执行完毕") print("执行完毕")
@classmethod @classmethod
def sum_numbers(self,body,obsid,shost,sport,suser,spwd,sdb): def sum_numbers(self,body,sink_job_name,current_job_id):
#取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
#调用grpc的SendJobMessage #调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel) stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey() test = control_pb2.JobKey()
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox") #当前模块的id
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb) test.builtin_job_id.current_job_id = int(current_job_id)
cursor = conn.cursor() #下级模块的名字
test.builtin_job_id.sink_job_name =sink_job_name
current_job_id=os.getenv('JOB_ID')
sql ="SELECT job_name FROM t_obs where obs_x ='{}' ;".format(obsid)
cursor.execute(sql)
rows = cursor.fetchall()
for rowname in rows:
#当前模块的id
test.builtin_job_id.current_job_id = int(current_job_id)
#下级模块的名字
test.builtin_job_id.sink_job_name = rowname[0]
test.key_text = body test.key_text = body
reflag = stub.SendJobMessage(test) reflag = stub.SendMessage(test)
print("rowname : %s" %(rowname[0])) print("rowname : %s" %(sink_job_name))
print("reflag : %d " %(reflag.value)) print("reflag : %d " %(reflag.value))
conn.commit()
cursor.close() return reflag.value
conn.close()
return "执行成功"
if __name__ == '__main__': if __name__ == '__main__':
parameter = sys.argv parameter = sys.argv
body=parameter[1] body=parameter[1]
......
...@@ -19,57 +19,42 @@ class admL1Api(): ...@@ -19,57 +19,42 @@ class admL1Api():
suser = os.getenv('CSST_SCALEBOX_USER') suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD') spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE') sdb = os.getenv('CSST_SCALEBOX_DATABASE')
#取环境变量中模块id,需预先设定 admL1Api.sum_numbers(body,obsid,shost,sport,suser,spwd,sdb)
#current_job_id=os.getenv('CSST_ADML1_APPID')
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
cursor = conn.cursor()
#sql ="SELECT id,name FROM t_app ;"
#根据admL1的名字查找它的jobid
admsql = "SELECT id FROM t_job where name = 'admL1'"
cursor.execute(admsql)
admrows = cursor.fetchone()
current_job_id = admrows[0]
sink_job_name=""
if obsid==1:
admL1Api.sum_numbers(body,"mbi",current_job_id)
admL1Api.sum_numbers(body,"sls2d",current_job_id)
elif obsid==2:
sink_job_name="mci"
elif obsid==3:
sink_job_name="ifs"
elif obsid==4:
sink_job_name="cpic"
elif obsid==5:
sink_job_name="hstdm"
else:
sink_job_name=""
if sink_job_name:
admL1Api.sum_numbers(body,sink_job_name,current_job_id)
else:
print('等待模块传输')
print("执行完毕") print("执行完毕")
@classmethod @classmethod
def sum_numbers(self,body,sink_job_name,current_job_id): def sum_numbers(self,body,obsid,shost,sport,suser,spwd,sdb):
#取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
#调用grpc的SendJobMessage #调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER')) channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel) stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey() test = control_pb2.JobKey()
#当前模块的id # conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
test.builtin_job_id.current_job_id = int(current_job_id) conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
#下级模块的名字 cursor = conn.cursor()
test.builtin_job_id.sink_job_name =sink_job_name
test.key_text = body current_job_id=os.getenv('JOB_ID')
reflag = stub.SendJobMessage(test)
print("rowname : %s" %(sink_job_name)) sql ="SELECT job_name FROM t_obs where obs_x ='{}' ;".format(obsid)
print("reflag : %d " %(reflag.value)) cursor.execute(sql)
rows = cursor.fetchall()
return reflag.value for rowname in rows:
#当前模块的id
test.builtin_job_id.current_job_id = int(current_job_id)
#下级模块的名字
test.builtin_job_id.sink_job_name = rowname[0]
test.key_text = body
reflag = stub.SendJobMessage(test)
print("rowname : %s" %(rowname[0]))
print("reflag : %d " %(reflag.value))
conn.commit()
cursor.close()
conn.close()
return "执行成功"
if __name__ == '__main__': if __name__ == '__main__':
parameter = sys.argv parameter = sys.argv
body=parameter[1] body=parameter[1]
......
AUX_DIR=/sharewcl/L1Pipeline/aux AUX_DIR=/sharewcl/pipeline/aux
CRDS_DIR=/sharewcl/OnOrbitCal/SimData/ref_202211/products_ref20_3hdr CRDS_DIR=/sharewcl/OnOrbitCal/SimData/ref_202211/products_ref20_3hdr
DFS_ROOT=/sharewcl/dfs DFS_ROOT=/sharewcl/dfs
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh /app/bin/
COPY *.py /app/bin/
RUN cd /app/bin/ \
&& chmod +x run.sh
IMAGE_NAME:=hub.cstcloud.cn/csst/module1:dev
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
import os
import sys
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
print(body)
\ No newline at end of file
#!/bin/bash
python3 /app/bin/module1.py $1
send-message $1
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh /app/bin/
COPY *.py /app/bin/
RUN cd /app/bin/ \
&& chmod +x run.sh
IMAGE_NAME:=hub.cstcloud.cn/csst/module2:dev
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
import os
import sys
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
print(body)
\ No newline at end of file
#!/bin/bash
python3 /app/bin/module2.py $1
send-message $1
FROM hub.cstcloud.cn/scalebox/agent FROM hub.cstcloud.cn/scalebox/agent:0.9
# 安装redis-cli # 安装redis-cli
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y redis && apt-get install -y redis
ENV REDIS_SERVER=10.0.0.9 #ENV REDIS_SERVER=10.0.0.9
#ENV REDIS_SERVER=192.169.23.2 ENV REDIS_SERVER=192.169.23.2
COPY run.sh /app/bin/ COPY run.sh /app/bin/
RUN chmod +x /app/bin/run.sh RUN chmod +x /app/bin/run.sh
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment