Commit 8bbaa22a authored by zxl's avatar zxl
Browse files

Initial pipeline L1 & L2

parents
CSST_ADML2_JOB_ID=2
# grpc连接参数
CSST_PIPELINE_GRPC_SERVER=127.0.0.1:50051
import grpc
import control_pb2_grpc
import control_pb2
import os
import sys
#递归调用
class admL2Api():
def __init__(self):
self.body = ""
self.a = 0
def sum_all(self,body):
#取brickid,name1,name2,name3中brickid
brickid=body.split(",",1)[0]
#取brickid后面的name1,name2,name3
body=body.split(",",1)[1]
admL2Api.sum_numbers(body,brickid)
@classmethod
def sum_numbers(self,body,brickid):
#取name1,name2,name3中第一个逗号之前字符
name=body.split(",",1)[0]
#取name1,name2,name3中第一个逗号之后字符
body=body.split(",",1)[1]
#取环境变量中模块id,需预先设定
current_job_id=os.getenv('CSST_ADML2_JOB_ID')
#调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
test.builtin_job_id.current_job_id = int(current_job_id)
test.builtin_job_id.sink_job_name = name
test.key_text = brickid+","+body
reflag = stub.SendJobMessage(test)
print(reflag.value)
return "执行完毕"
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
w=admL2Api()
w.sum_all(body)
#python3 admL2.py 'brikd,name1,name2'
\ No newline at end of file
syntax = "proto3";
package scalebox;
option go_package="./;scalebox";
option java_multiple_files = true;
// option java_outer_classname = "ControlProto";
option java_package = "cn.scalebox.gprc";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
service ControlService {
// //////////////////////////////////////////////////////////////////
// agent client
// //////////////////////////////////////////////////////////////////
// input : slot id('ON')
// return : job-key of the task, id==0(NULL)
// task id : 'READY/-1' -> 'QUEUED/-2'
rpc GetNextTask(google.protobuf.Int32Value) returns (TaskItem);
// input : task id('QUEUED/-2')
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(WRONG STATUS)
// task_status_code:'QUEUED'/-2 -> 'RUNNING'/-3
rpc SetTaskStarted(google.protobuf.Int64Value) returns (google.protobuf.Int32Value);
// input : TaskExecMessage
// return : ret_code : 0(OK), -1(task NOT FOUND), -2(task WRONG STATUS)
// task_status_code:'RUNNING'/-3 -> 'OK'/0, ...
rpc SetTaskFinished(TaskExecMessage) returns (google.protobuf.Int32Value);
// slot exit automatically, called in agent side
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot : 'ON' -> 'OFF'
rpc SetSlotTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// //////////////////////////////////////////////////////////////////
// actuator client
// //////////////////////////////////////////////////////////////////
// input: token_string ?
// job:'RUNNING' && slot:'READY'
rpc GetRunnableSlotList(google.protobuf.Empty) returns (CommandList);
// input : slot id
// return : ret_code : 0(OK), -1(slot NOT FOUND), -2(slot WRONG STATUS)
// slot: 'READY' -> 'ON'
rpc SetSlotInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// job:'PAUSED'/'ARCHIVED' && slot:'ON'
rpc GetTerminableSlotList(google.protobuf.Empty) returns (CommandList);
// job:'RUNNING' && worker:'PAUSED'
rpc GetRunnableWorkerList(google.protobuf.Empty) returns (CommandList);
// input : worker id
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS)
// worker: 'PAUSED' -> 'RUNNING'
rpc SetWorkerInitialized(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// job:'PAUSED' && worker:'RUNNING'
rpc GetTerminableWorkerList(google.protobuf.Empty) returns (CommandList);
// input : worker id, return ret_code;
// return : ret_code : 0(OK), -1(worker NOT FOUND), -2(worker WRONG STATUS)
// worker : 'RUNNING' -> 'PAUSED'
rpc SetWorkerTerminated(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// //////////////////////////////////////////////////////////////////
// app client , called by user app.
// //////////////////////////////////////////////////////////////////
// send task-key to next job in current pipeline
// return : ret_code : 0(OK), -1(job NOT FOUND)
// task_status_code : 'INITIAL'/-9
rpc SendJobMessage(JobKey) returns (google.protobuf.Int32Value);
// rpc SendToNextJob(JobKey) returns (google.protobuf.Int32Value);
rpc SendJobMessages(JobKeys) returns (google.protobuf.Int32Value);
}
message JobKey {
message JobIdRef{
// qualified name of job
string sink_job_name=1;
int32 current_job_id=2;
}
oneof jobId {
int32 cross_app_job_id=1;
JobIdRef builtin_job_id=2;
}
string key_text=10;
}
message JobKeys {
// qualified name of job
string next_job_name=1;
repeated string key_texts=2;
int32 current_job_id=3;
// OR worker_id ?
}
message TaskItem {
int64 id=1;
string key=2;
}
message Command {
// primary key of slot/worker table
int32 id=1;
string host=2;
string command_text=3;
}
message CommandList {
repeated Command command=1;
}
message TaskExecMessage {
int32 slot=1;
int32 status_code=2;
int64 task_id=3;
int64 input_bytes=4;
int64 output_bytes=5;
google.protobuf.Timestamp t0=6;
google.protobuf.Timestamp t1=7;
google.protobuf.Timestamp t2=8;
google.protobuf.Timestamp t3=9;
google.protobuf.Timestamp t4=10;
repeated google.protobuf.Timestamp time_arr=11;
string sys_out=12;
string app_out=13;
}
#!/bin/bash
python3 /app/bin/admL2.py $1
\ No newline at end of file
import os
os.environ['CSST_PIPELINE_GRPC_SERVER']="10.255.2.11:50051"
os.environ['CSST_ADML2_JOB_ID']="2"
os.environ['current_job_id']="1"
from admL2 import admL2Api
body="brikd,name1,admL2"
admL2Api().sum_all(body)
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh /app/bin/
COPY *.py /app/bin/
\ No newline at end of file
IMAGE_NAME:=hub.cstcloud.cn/csst/name1:dev
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
import os
import sys
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
print(body)
\ No newline at end of file
#!/bin/bash
python /app/bin/name1.py $1
send-message $1
\ No newline at end of file
FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY *.sh /app/bin/
COPY *.py /app/bin/
\ No newline at end of file
IMAGE_NAME:=hub.cstcloud.cn/csst/name2:dev
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
push:
docker push $(IMAGE_NAME)
run:
docker run -it --entrypoint bash $(IMAGE_NAME)
down:
docker stop $(IMAGE_NAME)
import os
import sys
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
print(body)
\ No newline at end of file
#!/bin/bash
python /app/bin/name2.py $1
send-message $1
\ No newline at end of file
CLUSTER=csst
all: reset parse up-app
reset:
cd ${HOME}/docker-scalebox/clusters/$(CLUSTER) && make && cd -
up-app:
docker exec -t database psql -Uscalebox -c "UPDATE t_app SET status='RUNNING' WHERE status='INITIAL'"
parse:
CLUSTER=$(CLUSTER) \
${HOME}/docker-scalebox/bin/parse.sh app.yaml
start:
docker run -it --rm --network host -e JOB_ID=17 -e GRPC_SERVER=localhost:50051 hub.cstcloud.cn/scalebox/base bash -c ' send-job-message filelist . '
down:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) down
list:
make -C ${HOME}/docker-scalebox/clusters/$(CLUSTER) list
name: csst-1.app.process
label: csst-adminL1
output_basedir: /data/csst-adminL1
cluster: csst
jobs:
admL1:
label: admin管理模块1
base_image: hub.cstcloud.cn/csst/adml1:dev
type: LONG-RUN-JOB
schedule_mode: HEAD
## key_example: Dec+5841_09_04/20200817
key_group_regex: ^(Dec.+/[0-9]+)$
# environments:
# - RSYNC_SRC=scalebox@159.226.237.136
# - ROOT_PATH=${ROOT_PATH}
# - REGEX_FILTER=^.+_000[0-7]_2bit.fits$
command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
sink_jobs:
- module1
#- module2
module1:
label: 模块一
base_image: hub.cstcloud.cn/csst/module1:dev
type: LONG-RUN-JOB
schedule_mode: HEAD
## key_example: Dec+5841_09_04/20200817
key_group_regex: ^(Dec.+/[0-9]+)$
# paths:
# - ${ROOT_PATH}:${ROOT_PATH}:ro
# environments:
# - RSYNC_SRC=scalebox@159.226.237.136
# - ROOT_PATH=${ROOT_PATH}
# - REGEX_FILTER=^.+_000[0-7]_2bit.fits$
command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# sink_jobs:
# - admL1
# module2:
# label: 模块二
# base_image: hub.cstcloud.cn/csst/module2:dev
# type: LONG-RUN-JOB
# schedule_mode: HEAD
# ## key_example: Dec+5841_09_04/20200817
# key_group_regex: ^(Dec.+/[0-9]+)$
# paths:
# - ${ROOT_PATH}:${ROOT_PATH}:ro
# environments:
# # - RSYNC_SRC=scalebox@159.226.237.136
# - ROOT_PATH=${ROOT_PATH}
# # - REGEX_FILTER=^.+_000[0-7]_2bit.fits$
# command: docker run -d --network=host %ENVS% %VOLUMES% %IMAGE%
# # sink_jobs:
# # - admL1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment