Commit aa8d6ce7 authored by Zhang Xiaoli's avatar Zhang Xiaoli
Browse files

scalebox task add时增加upsert参数

parent 2f08dfaa
......@@ -2,32 +2,17 @@ FROM hub.cstcloud.cn/scalebox/agent
LABEL maintainer="Xiaoli Zhang<zhangxiaoli@cnic.cn>"
USER root
# 安装python
RUN apt-get update \
&& apt-get install -y python3 python3-pip \
&& apt-get clean \
RUN apt update \
&& apt install -y python3 \
&& apt clean \
&& rm -rf /var/lib/apt/lists/*
#安装第三方包 grpcio、protobuf、grpcio_tools、psycopg2
RUN pip install grpcio
RUN pip install protobuf
RUN pip install grpcio_tools
RUN pip install psycopg2-binary
COPY *.sh /app/bin/
COPY *.py /app/bin/
COPY *.proto /app/bin/
#controld version 202208
RUN cd /app/bin/ \
&& mkdir controld \
&& python3 -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. control.proto -I=controld \
&& chmod +x run.sh
# COPY *.proto /app/bin/
RUN chmod +x /app/bin/run.sh
ENV CSST_SCALEBOX_HOST=192.169.23.2 \
CSST_SCALEBOX_PORT=5432 \
CSST_SCALEBOX_USER=scalebox \
CSST_SCALEBOX_PWD=changeme \
CSST_SCALEBOX_DATABASE=scalebox \
CSST_PIPELINE_GRPC_SERVER="192.169.23.2:50051" \
CSST_ADML1_APPID="1"
ENV CSST_PIPELINE_GRPC_SERVER="10.3.10.28:50051"
IMAGE_NAME:=csst/adm-l1
IMAGE_PATH:=/nfsdata/tmp
build:
docker build --network=host -t $(IMAGE_NAME) .
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
# docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'
docker save $(IMAGE_NAME) > $(IMAGE_PATH)/adm-l1.tar
push:
docker push $(IMAGE_NAME)
run:
......@@ -13,4 +13,4 @@ run:
down:
docker stop $(IMAGE_NAME)
scp:
scp -r ./ csst-zjs:/root/csst/adm-L1/
\ No newline at end of file
scp -r ./ scalebox-beta:~/csst/adm-L1/
\ No newline at end of file
import grpc
import control_pb2_grpc
import control_pb2
import psycopg2
import os
import sys
#递归调用
class admL1Api():
def __init__(self):
self.body = ""
def adminLevelOne(self,body):
#获取obsid
obsid=body.split(",",1)[0]
#获取moduleid
moduleid=body.split(",",1)[1]
#根据moduleid获取模块内部jobid
shost = os.getenv('CSST_SCALEBOX_HOST')
sport = os.getenv('CSST_SCALEBOX_PORT')
suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE')
# conn = psycopg2.connect(host="10.255.2.11",port=5432,user="scalebox",password="changeme",database="scalebox")
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
cursor = conn.cursor()
#获取流水线id
app_id=int(os.getenv('CSST_ADML1_APPID'))
sql ="SELECT id FROM t_job where name = %s and app= %s;"
params = (moduleid,app_id,)
cursor.execute(sql,params)
rows = cursor.fetchone()
jobId=rows[0]
conn.commit()
cursor.close()
conn.close()
#调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
test.cross_app_job_id = int(jobId)
test.key_text = obsid
reflag = stub.SendMessage(test)
print(reflag.value)
return reflag.value
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
w=admL1Api()
w.adminLevelOne(body)
\ No newline at end of file
syntax = "proto3";
package scalebox;
option go_package="./;scalebox";
option java_multiple_files = true;
// option java_outer_classname = "ControlProto";
option java_package = "cn.scalebox.gprc";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
service ControlService {
// //////////////////////////////////////////////////////////////////
// actuator client
// //////////////////////////////////////////////////////////////////
// parameter : inline_cluster_name
rpc GetInlineSlotList(google.protobuf.StringValue) returns (InlineSlotList);
// parameter : external_cluster_name
rpc GetExternalSlotList(google.protobuf.StringValue) returns (ExternalSlotList);
// 'slot_run' only for the inline cluster,called by actuator
// 'agent_setup'/'agent_teardown', called by agent
// rpc SetSlotRunning(SlotExecMessage) returns (google.protobuf.Empty);
rpc SaveClientExecInfo(ClientExecMessage) returns (google.protobuf.Empty);
// //////////////////////////////////////////////////////////////////
// agent client
// //////////////////////////////////////////////////////////////////
// parameter : slot_id
// slot: 'READY' -> 'ON'
rpc SetSlotInitialized(google.protobuf.Int32Value) returns (google.protobuf.Empty);
// slot exit automatically, slot : 'ON' -> 'OFF'/'READY'
// parameter : slot_id
rpc SetSlotTerminated(google.protobuf.Int32Value) returns (google.protobuf.Empty);;
// parameter : job_id/host_ip, sep=','
// return : slot id
rpc RegisterSlot(google.protobuf.StringValue) returns (google.protobuf.Int32Value);
// parameter : slot_id
rpc DeregisterSlot(google.protobuf.Int32Value) returns (google.protobuf.Empty);
// task id : 'READY/-1' -> 'QUEUED/-2'
// parameter : slot_id('ON')
// return : job-key of the task, id==0(NULL)
rpc GetNextTask(google.protobuf.Int32Value) returns (TaskItem);
// task_status_code:'QUEUED'/-2 -> 'RUNNING'/-3
// parameter : task_id('QUEUED/-2')
rpc SetTaskStarted(google.protobuf.Int64Value) returns (google.protobuf.Empty);
// task_status_code:'RUNNING'/-3 -> 'OK'/0, ...
// parameter : TaskExecMessage
rpc SetTaskFinished(TaskExecMessage) returns (google.protobuf.Empty);
// input : slot id
// return : ret_code < 0 exit
rpc CheckSlotHeartbeat(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
// send job-key to next job in current pipeline
// return : task_id(OK), <0 (error)
// task_status_code : 'INITIAL'/-9
rpc SendMessage(JobKey) returns (google.protobuf.Int64Value);
// send 'START' message to head job, and set its status to running
// rpc SetAppRunning(google.protobuf.Int32Value) returns (google.protobuf.Int32Value);
}
message JobKey {
message JobIdRef{
// qualified name of job
string sink_job_name=1;
// for sink-job in the same app
int32 current_job_id=2;
}
message JobIdAppRef{
// qualified name of job
string sink_job_name=1;
// for sink-job in remote server (app id)
int32 app_id=2;
// <ip-addr:port> for controld/grpc-server
string remote_server=3;
}
oneof jobId {
// for the same app
JobIdRef builtin_job_id=1;
// for different app in the same cluster
int32 cross_app_job_id=2;
// for cross-cluster app
JobIdAppRef cross_server_job_id=3;
}
// 1. multi-messages : comma-sep
// 2. customized sep : json-format
// label: "multi-messages", text: "abc:134:345ß"
string key_text=10;
bool async_task_creation=20;
map<string, string> headers = 21;
}
message TaskItem {
int64 id=1;
string key=2;
}
message InlineSlotInfo {
// primary key of slot table
int32 id=1;
string host=2;
int32 port=3;
string uname=4;
string command_text=5;
}
message InlineSlotList {
repeated InlineSlotInfo slots=1;
}
message ExternalSlotList {
repeated ExternalSlotInfo slots=1;
}
message ExternalSlotInfo {
int32 job_id=1;
string command_text=2;
int32 num_slots=3;
map<string,string> resource_req=4;
}
message TaskExecMessage {
int32 slot=1;
int32 status_code=2;
int64 task_id=3;
int64 input_bytes=4;
int64 output_bytes=5;
google.protobuf.Timestamp t0=6;
google.protobuf.Timestamp t1=7;
google.protobuf.Timestamp t2=8;
google.protobuf.Timestamp t3=9;
google.protobuf.Timestamp t4=10;
repeated google.protobuf.Timestamp tc_arr=11;
string stdout=12;
string stderr=13;
string userText=14;
}
message ClientExecMessage {
// message SlotExecMessage {
int32 slot=1;
// return code
int32 code=2;
string stdout=3;
string stderr=4;
// client start time
google.protobuf.Timestamp tc0=5;
// client end time
google.protobuf.Timestamp tc1=6;
// 'slot_run', 'agent_setup', 'agent_teardown'
string action=7;
}
import grpc
import control_pb2_grpc
import control_pb2
import psycopg2
import os
import sys
import subprocess
class admL1Api():
def __init__(self):
self.body = ""
......@@ -12,82 +8,62 @@ class admL1Api():
def adminLevelOne(self,body):
#获取obsid
obsid=int(str(body)[0])
#获取body
# body=body.split(",",1)[1]
shost = os.getenv('CSST_SCALEBOX_HOST')
sport = os.getenv('CSST_SCALEBOX_PORT')
suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE')
#取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
# conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
# cursor = conn.cursor()
# #sql ="SELECT id,name FROM t_app ;"
# #根据admL1的名字查找它的jobid
# admsql = "SELECT id FROM t_job where name = 'adm-L1'"
# cursor.execute(admsql)
# admrows = cursor.fetchone()
# current_job_id = admrows[0]
#当前模块的id
current_job_id=os.getenv('JOB_ID')
print("current_job_id : %s" %(current_job_id))
sink_job_name=""
body = body.replace(' ', '-')
print("body : %s" %(body))
# body = body.replace(' ', '-')
# 将数字转换为字符串
body_str = str(body)
# 获取前三位
obsid = body_str[:3]
print("obsid : %s" %(obsid))
# 获取后两位
chipid = body_str[-2:]
moduleid=""
if obsid == "101":
print("chipid : %s" %(chipid))
if obsid in {"101", "102", "103", "104", "105", "106", "110", "111", "112"}:
if chipid in {"06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25"}:
admL1Api.sum_numbers(body,"msc-l1-mbi",current_job_id)
elif chipid in {"01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30"}:
admL1Api.sum_numbers(body,"msc-l1-sls2d",current_job_id)
admL1Api.sum_numbers(body,"msc-l1-sls",current_job_id)
else:
print("101未发送")
elif obsid== "108":
print("无效chipid %s" %(body))
sys.exit(100)
elif obsid in {"107", "108"}:
if chipid in {"06", "07", "08", "09", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "22", "23", "24", "25"}:
admL1Api.sum_numbers(body,"msc-l1-mbi",current_job_id)
elif chipid in {"01", "02", "03", "04", "05", "10", "21", "26", "27", "28", "29", "30"}:
admL1Api.sum_numbers(body,"msc-l1-sls2d",current_job_id)
admL1Api.sum_numbers(body,"msc-l1-sls",current_job_id)
else:
print("201未发送")
elif obsid== "110":
print("无效chipid %s" %(body))
sys.exit(100)
elif "120" <=obsid <= "129":
admL1Api.sum_numbers(body,"msc-l1-ooc",current_job_id)
else:
sink_job_name=""
print("无效obsid: %s" %(body))
sys.exit(101)
if sink_job_name:
admL1Api.sum_numbers(body,sink_job_name,current_job_id)
else:
print('等待模块传输')
print("执行完毕")
@classmethod
def sum_numbers(self,body,sink_job_name,current_job_id):
#调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
#当前模块的id
test.builtin_job_id.current_job_id = int(current_job_id)
#下级模块的名字
test.builtin_job_id.sink_job_name =sink_job_name
test.key_text = body
reflag = stub.SendMessage(test)
print("rowname : %s" %(sink_job_name))
print("reflag : %d " %(reflag.value))
return reflag.value
print("0000000000000001")
print(sink_job_name)
print(body)
command = f"scalebox task add -sink-job={sink_job_name} {body}"
result=subprocess.run(command, shell=True)
if result.returncode == 0:
print("命令执行成功")
sys.exit(0)
else:
print(f"命令执行失败,返回码为: {result.returncode}")
return result.returncode
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
......
import grpc
import control_pb2_grpc
import control_pb2
import psycopg2
import os
import sys
class admL1Api():
def __init__(self):
self.body = ""
def adminLevelOne(self,body):
#获取obsid
obsid=int(str(body)[0])
#获取body
# body=body.split(",",1)[1]
shost = os.getenv('CSST_SCALEBOX_HOST')
sport = os.getenv('CSST_SCALEBOX_PORT')
suser = os.getenv('CSST_SCALEBOX_USER')
spwd = os.getenv('CSST_SCALEBOX_PWD')
sdb = os.getenv('CSST_SCALEBOX_DATABASE')
admL1Api.sum_numbers(body,obsid,shost,sport,suser,spwd,sdb)
print("执行完毕")
@classmethod
def sum_numbers(self,body,obsid,shost,sport,suser,spwd,sdb):
#取环境变量中模块id,需预先设定
#current_job_id=os.getenv('CSST_ADML1_APPID')
#调用grpc的SendJobMessage
channel = grpc.insecure_channel(os.getenv('CSST_PIPELINE_GRPC_SERVER'))
stub = control_pb2_grpc.ControlServiceStub(channel)
test = control_pb2.JobKey()
# conn = psycopg2.connect(host="10.255.2.12",port=5433,user="scalebox",password="changeme",database="scalebox")
conn = psycopg2.connect(host=shost,port=sport,user=suser,password=spwd,database=sdb)
cursor = conn.cursor()
current_job_id=os.getenv('JOB_ID')
sql ="SELECT job_name FROM t_obs where obs_x ='{}' ;".format(obsid)
cursor.execute(sql)
rows = cursor.fetchall()
for rowname in rows:
#当前模块的id
test.builtin_job_id.current_job_id = int(current_job_id)
#下级模块的名字
test.builtin_job_id.sink_job_name = rowname[0]
test.key_text = body
reflag = stub.SendJobMessage(test)
print("rowname : %s" %(rowname[0]))
print("reflag : %d " %(reflag.value))
conn.commit()
cursor.close()
conn.close()
return "执行成功"
if __name__ == '__main__':
parameter = sys.argv
body=parameter[1]
w=admL1Api()
w.adminLevelOne(body)
\ No newline at end of file
CREATE SEQUENCE t_obs_id_seq INCREMENT 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1;
CREATE TABLE "public"."t_obs" (
"id" integer DEFAULT nextval('t_obs_id_seq') NOT NULL,
"obs_x" integer NOT NULL,
"job_name" text NOT NULL,
CONSTRAINT "t_obs_pkey" PRIMARY KEY ("id")
) WITH (oids = false);
INSERT INTO "t_obs" ("id", "obs_x", "job_name") VALUES
(1, 1, 'mbi'),
(2, 1, 'sls2d'),
(3, 2, 'MCI'),
(4, 3, 'IFS'),
(5, 4, 'CPIC'),
(6, 5, 'HSTDM');
\ No newline at end of file
#!/bin/bash
python3 /app/bin/mgL1.py $1
exit_code=$?
exit $exit_code
\ No newline at end of file
import os
os.environ['CSST_PIPELINE_GRPC_SERVER']="10.255.2.11:50051"
os.environ['app_id']="1"
from admL1 import admL1Api
body="brikd,moudle1"
admL1Api().adminLevelOne(body)
FROM 10.3.10.10:8113/csst/csst-msc-l1-mbi:100356
FROM 10.3.10.10:8113/csst/csst-msc-l1-mbi
USER root
COPY run.sh /app/bin/
......
......@@ -39,7 +39,7 @@ fi
if [ $exit_code -eq 0 ]; then
echo "finish mbi, start sink-job."
scalebox task add --header dag_run_id=${dag_run_id} "$1"
scalebox task add --header dag_run_id=${dag_run_id} --upsert "$1"
# scalebox task add $1
else
echo "finish mbi, exit_code: $exit_code"
......
......@@ -39,7 +39,7 @@ else
fi
if [ $exit_code -eq 0 ]; then
scalebox task add --header dag_run_id=${dag_run_id} "$1"
scalebox task add --header dag_run_id=${dag_run_id} --upsert "$1"
echo "finish qc0, start sink-job."
else
echo "finish qc0, exit_code: $exit_code"
......
......@@ -48,7 +48,7 @@ do
#send-message $obsid
job_name="message-router-csst"
scalebox task add --app-id ${app_id} --header dag_run_id=${dag_run_id} --sink-job ${job_name} ${message}
scalebox task add --app-id ${app_id} --header dag_run_id=${dag_run_id} --upsert --sink-job ${job_name} ${message}
code=$?
fi
......
redis-cli -h 10.3.10.28 -a 123456 lpush csst_data_list_zxl "{\"dag_id\": \"csst-msc-l1-mbi\", \"parameters\": {\"dag_run_id\": \"202411111841239406\", \"message\": {\"obsid\": \"11009101682009\", \"chipid\": \"16\"}}}"
scalebox task add --app-id 16 --header dag_run_id="202411258348645214" --upsert --sink-job "message-router-csst" "11009101682009-09"
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment