From 4e27fcc14ad3270f52cfba0fe55609f7645c0fba Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 21 Oct 2025 16:39:31 +0800 Subject: [PATCH 1/7] feat: support hami ascend device Signed-off-by: dl239 --- .../api/devices/ascend/device_info.go | 530 ++++++++++++++++++ pkg/scheduler/api/devices/config/config.go | 19 + pkg/scheduler/api/devices/config/vnpu.go | 36 ++ pkg/scheduler/api/devices/device_info.go | 220 ++++++++ pkg/scheduler/api/devices/util.go | 96 ++++ pkg/scheduler/api/node_info.go | 7 + 6 files changed, 908 insertions(+) create mode 100644 pkg/scheduler/api/devices/ascend/device_info.go create mode 100644 pkg/scheduler/api/devices/config/vnpu.go create mode 100644 pkg/scheduler/api/devices/device_info.go diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go new file mode 100644 index 0000000000..c64fd4f0f5 --- /dev/null +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -0,0 +1,530 @@ +/* +Copyright 2023 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ascend + +import ( + "encoding/json" + "flag" + "fmt" + "sort" + "strconv" + "time" + + "github.com/pkg/errors" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/config" + "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" +) + +const ( + NodeLockAscend = "hami.io/mutex.lock" + Ascend910Prefix = "Ascend910" + Ascend910NetworkWeight = 10 + // binpack means the lower device memory remained after this allocation, the better + binpackPolicy = "binpack" + // spread means better put this task into an idle GPU card than a shared GPU card + spreadPolicy = "spread" + binpackMultiplier = 100 + spreadMultiplier = 100 +) + +type AscendDevice struct { + config config.VNPUConfig + nodeRegisterAnno string + useUUIDAnno string + noUseUUIDAnno string + handshakeAnno string + DeviceInfo *devices.DeviceInfo + DeviceUsage *devices.DeviceUsage +} + +type AscendDevices struct { + NodeName string + Type string + Score float64 + Devices map[string]*AscendDevice + CandicateDevice devices.PodSingleDevice +} + +type RuntimeInfo struct { + UUID string `json:"UUID,omitempty"` + Temp string `json:"temp,omitempty"` +} + +var ( + enableAscend bool + configFile string + NodeLockEnable bool +) + +func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { + ascend_devices := make(map[string]*AscendDevices) + if node == nil { + return ascend_devices + } + devs := InitDevices(config.GetConfig().VNPUs) + for _, dev := range devs { + node_devices, err := dev.GetNodeDevices(*node) + if err != nil { + klog.V(5).InfoS("Failed to get node devices", "nodeName", node.Name, "deviceType", dev.CommonWord(), "error", err) + continue + } + as_devices := &AscendDevices{ + NodeName: name, + Type: dev.CommonWord(), + Devices: make(map[string]*AscendDevice), + } + for _, nd := range node_devices { + dev.DeviceInfo = nd + dev.DeviceUsage = &devices.DeviceUsage{ + Used: 0, + Usedmem: 0, + Usedcores: 0, + } + as_devices.Devices[nd.ID] = dev + } + ascend_devices[dev.CommonWord()] = as_devices + } + return ascend_devices +} + +func (ads *AscendDevices) AddResourceUsage(id string, cores int32, mem int32) error { + dev, ok := ads.Devices[id] + if !ok { + return fmt.Errorf("ascend device %s not found", id) + } + dev.DeviceUsage.Used++ + dev.DeviceUsage.Usedcores += cores + dev.DeviceUsage.Usedmem += mem + return nil +} + +func (ads *AscendDevices) SubResourceUsage(id string, cores int32, mem int32) error { + dev, ok := ads.Devices[id] + if !ok { + return fmt.Errorf("ascend device %s not found", id) + } + dev.DeviceUsage.Used-- + dev.DeviceUsage.Usedcores -= cores + dev.DeviceUsage.Usedmem -= mem + return nil +} + +func (ads *AscendDevices) AddResource(pod *v1.Pod) { + if ads == nil { + return + } + ads.addResource(pod.Annotations, pod) +} + +func (ads *AscendDevices) SubResource(pod *v1.Pod) { + if ads == nil { + return + } + ano_key := devices.InRequestDevices[ads.Type] + ano, ok := pod.Annotations[ano_key] + if !ok { + klog.Errorf("pod %s has no annotation %s", pod.Name, ano_key) + return + } + con_devs, err := devices.DecodeContainerDevices(ano) + if err != nil { + klog.ErrorS(err, "failed to decode container devices", "pod", pod.Name, "annotation", ano) + return + } + for _, cono_dev := range con_devs { + ads.SubResourceUsage(cono_dev.UUID, cono_dev.Usedcores, cono_dev.Usedmem) + } +} + +func (ads *AscendDevices) addResource(annotations map[string]string, pod *v1.Pod) { + ano_key := devices.InRequestDevices[ads.Type] + ano, ok := annotations[ano_key] + if !ok { + klog.Errorf("pod %s has no annotation %s", pod.Name, ano_key) + return + } + con_devs, err := devices.DecodeContainerDevices(ano) + if err != nil { + klog.ErrorS(err, "failed to decode container devices", "pod", pod.Name, "annotation", ano) + return + } + for _, cono_dev := range con_devs { + ads.AddResourceUsage(cono_dev.UUID, cono_dev.Usedcores, cono_dev.Usedmem) + } +} + +func (ads *AscendDevices) AddQueueResource(pod *v1.Pod) map[string]float64 { + return map[string]float64{} +} + +func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { + rand_dev, err := ads.getRandomDevice() + if rand_dev == nil || err != nil { + return false + } + var vnpu_config = rand_dev.config + for _, container := range pod.Spec.Containers { + _, ok := container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceName)] + if ok { + return true + } + _, ok = container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceMemoryName)] + if ok { + return true + } + } + return false +} + +func getAscendDevicesSnapShot(ads *AscendDevices) *AscendDevices { + dup_ads := &AscendDevices{ + Devices: make(map[string]*AscendDevice), + } + for id, dev := range ads.Devices { + dup_dev := &AscendDevice{ + config: dev.config, + nodeRegisterAnno: dev.nodeRegisterAnno, + useUUIDAnno: dev.useUUIDAnno, + noUseUUIDAnno: dev.noUseUUIDAnno, + handshakeAnno: dev.handshakeAnno, + DeviceInfo: dev.DeviceInfo, + DeviceUsage: &devices.DeviceUsage{ + Used: dev.DeviceUsage.Used, + Usedmem: dev.DeviceUsage.Usedmem, + Usedcores: dev.DeviceUsage.Usedcores, + }, + } + dup_ads.Devices[id] = dup_dev + } + return dup_ads +} + +func (ads *AscendDevices) FilterNode(pod *v1.Pod, policy string) (int, string, error) { + devs, err := ads.selectDevices(pod, policy) + if err != nil { + return devices.Error, "no ascend device available", err + } + klog.V(4).Infoln("ascend DeviceSharing successfully filters pods. device_type:", ads.Type) + ads.CandicateDevice = devs + // cal score + ads.Score = 0 + for _, container_dev := range devs { + for _, d := range container_dev { + device_info, ok := ads.Devices[d.UUID] + if !ok { + continue + } + ads.Score += CalScore(policy, device_info.DeviceUsage, device_info.DeviceInfo) + } + } + return devices.Success, "", nil +} + +func (ads *AscendDevices) ScoreNode(pod *v1.Pod, policy string) float64 { + return ads.Score +} + +func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { + klog.V(4).Infoln("hami-vnpu DeviceSharing: Into AllocateToPod", pod.Name) + if NodeLockEnable { + nodelock.UseClient(kubeClient) + err := nodelock.LockNode(ads.NodeName, ads.Type) + if err != nil { + return errors.Errorf("node %s locked for %s hamivgpu lockname %s", ads.NodeName, pod.Name, err.Error()) + } + } + annotations := make(map[string]string) + ads.PatchAnnotations(pod, &annotations, ads.CandicateDevice) + + ads.addResource(annotations, pod) + err := devices.PatchPodAnnotations(kubeClient, pod, annotations) + if err != nil { + return err + } + if NodeLockEnable { + nodelock.ReleaseNodeLock(ads.NodeName, ads.Type) + } + klog.V(3).Infoln("DeviceSharing:Allocate Success") + return nil +} + +func (ads *AscendDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) error { + return nil +} + +func (ads *AscendDevices) GetIgnoredDevices() []string { + return []string{""} +} + +func (ads *AscendDevices) GetStatus() string { + return "" +} + +func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (devices.PodSingleDevice, error) { + rand_dev, err := ads.getRandomDevice() + if err != nil { + return nil, errors.Errorf("no ascend device available") + } + reqs := rand_dev.ResourceReqs(pod) + dup_ads := getAscendDevicesSnapShot(ads) + var pod_devs devices.PodSingleDevice + for _, req := range reqs { + var selected_devs devices.ContainerDevices + for _, dup_ad := range dup_ads.Devices { + if req.Type != ads.Type { + continue + } + device_usage := dup_ad.DeviceUsage + device_info := dup_ad.DeviceInfo + if device_info.Count < device_usage.Used { + continue + } + memreq := int32(0) + if req.Memreq > 0 { + memreq = req.Memreq + } else if req.MemPercentagereq != 101 && req.Memreq == 0 { + memreq = device_info.Devmem * req.MemPercentagereq / 100 + } + if device_info.Devmem-device_usage.Usedmem < memreq { + continue + } + if device_info.Devcore-device_usage.Usedcores < req.Coresreq { + continue + } + if device_info.Devcore == 100 && req.Coresreq == 100 && device_usage.Used > 0 { + continue + } + if device_info.Devcore != 0 && device_usage.Usedcores == device_info.Devcore && req.Coresreq == 0 { + continue + } + selected_devs = append(selected_devs, devices.ContainerDevice{ + UUID: device_info.ID, + Type: ads.Type, + Usedmem: memreq, + Usedcores: req.Coresreq, + CustomInfo: device_info.CustomInfo, + }) + break + } + if len(selected_devs) < int(req.Nums) { + return nil, errors.Errorf("no enough ascend device available") + } + pod_devs = append(pod_devs, selected_devs) + } + return pod_devs, nil +} + +func (ads *AscendDevices) getRandomDevice() (*AscendDevice, error) { + if len(ads.Devices) == 0 { + return nil, errors.New("no ascend device available") + } + for _, dev := range ads.Devices { + return dev, nil + } + return nil, errors.New("no ascend device available") +} + +func (dev *AscendDevice) trimMemory(m int64) (int64, string) { + for i := range dev.config.Templates { + if m <= dev.config.Templates[i].Memory { + return dev.config.Templates[i].Memory, dev.config.Templates[i].Name + } + } + if m <= dev.config.MemoryCapacity { + return dev.config.MemoryAllocatable, "" + } + return 0, "" +} + +func InitDevices(config []config.VNPUConfig) []*AscendDevice { + var devs []*AscendDevice + if !enableAscend { + return devs + } + for _, vnpu := range config { + commonWord := vnpu.CommonWord + dev := &AscendDevice{ + config: vnpu, + nodeRegisterAnno: fmt.Sprintf("hami.io/node-register-%s", commonWord), + useUUIDAnno: fmt.Sprintf("hami.io/use-%s-uuid", commonWord), + noUseUUIDAnno: fmt.Sprintf("hami.io/no-use-%s-uuid", commonWord), + handshakeAnno: fmt.Sprintf("hami.io/node-handshake-%s", commonWord), + } + sort.Slice(dev.config.Templates, func(i, j int) bool { + return dev.config.Templates[i].Memory < dev.config.Templates[j].Memory + }) + _, ok := devices.InRequestDevices[commonWord] + if !ok { + devices.InRequestDevices[commonWord] = fmt.Sprintf("hami.io/%s-devices-to-allocate", commonWord) + devices.SupportDevices[commonWord] = fmt.Sprintf("hami.io/%s-devices-allocated", commonWord) + // util.HandshakeAnnos[commonWord] = dev.handshakeAnno + } + devs = append(devs, dev) + klog.Infof("load ascend vnpu config %s: %v", commonWord, dev.config) + } + return devs +} + +func ParseConfig(fs *flag.FlagSet) { + fs.BoolVar(&enableAscend, "enable-ascend", false, "enable ascend device") +} + +func (dev *AscendDevice) CommonWord() string { + return dev.config.CommonWord +} + +func (dev *AscendDevice) GetNodeDevices(n v1.Node) ([]*devices.DeviceInfo, error) { + anno, ok := n.Annotations[dev.nodeRegisterAnno] + if !ok { + return []*devices.DeviceInfo{}, fmt.Errorf("annos not found %s", dev.nodeRegisterAnno) + } + nodeDevices, err := devices.UnMarshalNodeDevices(anno) + if err != nil { + klog.ErrorS(err, "failed to unmarshal node devices", "node", n.Name, "device annotation", anno) + return []*devices.DeviceInfo{}, err + } + if len(nodeDevices) == 0 { + klog.InfoS("no gpu device found", "node", n.Name, "device annotation", anno) + return []*devices.DeviceInfo{}, errors.New("no device found on node") + } + return nodeDevices, nil +} + +func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.ContainerDeviceRequest { + ascendResourceCount := v1.ResourceName(dev.config.ResourceName) + ascendResourceMem := v1.ResourceName(dev.config.ResourceMemoryName) + v, ok := ctr.Resources.Limits[ascendResourceCount] + if !ok { + v, ok = ctr.Resources.Requests[ascendResourceCount] + } + if ok { + klog.V(3).Infof("Counting %s devices", dev.config.CommonWord) + if n, ok := v.AsInt64(); ok { + klog.Info("Found AscendDevices devices") + memnum := 0 + mem, ok := ctr.Resources.Limits[ascendResourceMem] + if !ok { + mem, ok = ctr.Resources.Requests[ascendResourceMem] + } + if ok { + memnums, ok := mem.AsInt64() + if ok { + m, _ := dev.trimMemory(memnums) + memnum = int(m) + } + } + corenum := int32(0) + + mempnum := 0 + if memnum == 0 { + mempnum = 100 + } + + return devices.ContainerDeviceRequest{ + Nums: int32(n), + Type: dev.CommonWord(), + Memreq: int32(memnum), + MemPercentagereq: int32(mempnum), + Coresreq: corenum, + } + } + } + return devices.ContainerDeviceRequest{} +} + +func (dev *AscendDevice) ResourceReqs(pod *v1.Pod) []devices.ContainerDeviceRequest { + var reqs []devices.ContainerDeviceRequest + for _, ctr := range pod.Spec.Containers { + req := dev.GenerateResourceRequests(&ctr) + if req.Nums > 0 { + reqs = append(reqs, req) + } + } + return reqs +} + +func (ads *AscendDevices) PatchAnnotations(pod *v1.Pod, annoInput *map[string]string, devList devices.PodSingleDevice) map[string]string { + dev, err := ads.getRandomDevice() + if err != nil { + return *annoInput + } + commonWord := dev.CommonWord() + + (*annoInput)[devices.InRequestDevices[commonWord]] = devices.EncodePodSingleDevice(devList) + (*annoInput)[devices.SupportDevices[commonWord]] = devices.EncodePodSingleDevice(devList) + (*annoInput)["predicate-time"] = strconv.FormatInt(time.Now().Unix(), 10) + allocateStr := fmt.Sprintf("huawei.com/%s", dev.CommonWord()) + var rtInfo []RuntimeInfo + for _, dp := range devList { + for _, val := range dp { + _, temp := dev.trimMemory(int64(val.Usedmem)) + rtInfo = append(rtInfo, RuntimeInfo{ + UUID: val.UUID, + Temp: temp, + }) + } + } + s, err := json.Marshal(rtInfo) + if err != nil { + klog.ErrorS(err, "failed to marshal runtime info", "runtime info", rtInfo) + } + (*annoInput)[allocateStr] = string(s) + + return *annoInput +} + +func (dev *AscendDevice) GetResourceNames() devices.ResourceNames { + return devices.ResourceNames{ + ResourceCountName: dev.config.ResourceName, + ResourceMemoryName: dev.config.ResourceMemoryName, + ResourceCoreName: "", + } +} + +func CalScore(schedulePolicy string, dev_usage *devices.DeviceUsage, dev_info *devices.DeviceInfo) float64 { + var score float64 + switch schedulePolicy { + case binpackPolicy: + score = binpackMultiplier * (float64(dev_usage.Usedmem) / float64(dev_info.Devmem)) + case spreadPolicy: + if dev_usage.Used == 1 { + score = spreadMultiplier + } + default: + score = float64(0) + } + return score +} diff --git a/pkg/scheduler/api/devices/config/config.go b/pkg/scheduler/api/devices/config/config.go index ad61bacbf2..d5dc56d5dc 100644 --- a/pkg/scheduler/api/devices/config/config.go +++ b/pkg/scheduler/api/devices/config/config.go @@ -44,11 +44,30 @@ const ( hygon: resourceCountName: "volcano.sh/vdcu" ... + vnpus: + - chipName: 910B3 + commonWord: Ascend910B3 + resourceName: huawei.com/Ascend910B3 + resourceMemoryName: huawei.com/Ascend910B3-memory + memoryAllocatable: 65536 + memoryCapacity: 65536 + aiCore: 20 + aiCPU: 7 + templates: + - name: vir05_1c_16g + memory: 16384 + aiCore: 5 + aiCPU: 1 + - name: vir10_3c_32g + memory: 32768 + aiCore: 10 + aiCPU: 3 */ type Config struct { //NvidiaConfig is used for vGPU feature for nvidia, gpushare is not using this config NvidiaConfig NvidiaConfig `yaml:"nvidia"` + VNPUs []VNPUConfig `yaml:"vnpus"` } var ( diff --git a/pkg/scheduler/api/devices/config/vnpu.go b/pkg/scheduler/api/devices/config/vnpu.go new file mode 100644 index 0000000000..dc02910352 --- /dev/null +++ b/pkg/scheduler/api/devices/config/vnpu.go @@ -0,0 +1,36 @@ +/* +Copyright 2025 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +type Template struct { + Name string `yaml:"name"` + Memory int64 `yaml:"memory"` + AICore int32 `yaml:"aiCore,omitempty"` + AICPU int32 `yaml:"aiCPU,omitempty"` +} + +type VNPUConfig struct { + CommonWord string `yaml:"commonWord"` + ChipName string `yaml:"chipName"` + ResourceName string `yaml:"resourceName"` + ResourceMemoryName string `yaml:"resourceMemoryName"` + MemoryAllocatable int64 `yaml:"memoryAllocatable"` + MemoryCapacity int64 `yaml:"memoryCapacity"` + AICore int32 `yaml:"aiCore"` + AICPU int32 `yaml:"aiCPU"` + Templates []Template `yaml:"templates"` +} \ No newline at end of file diff --git a/pkg/scheduler/api/devices/device_info.go b/pkg/scheduler/api/devices/device_info.go new file mode 100644 index 0000000000..4e2cbd2ecf --- /dev/null +++ b/pkg/scheduler/api/devices/device_info.go @@ -0,0 +1,220 @@ +/* +Copyright 2023 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devices + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +type DeviceInfo struct { + ID string `json:"id,omitempty"` + Index uint `json:"index,omitempty"` + Count int32 `json:"count,omitempty"` + Devmem int32 `json:"devmem,omitempty"` + Devcore int32 `json:"devcore,omitempty"` + Type string `json:"type,omitempty"` + Numa int `json:"numa,omitempty"` + Mode string `json:"mode,omitempty"` + // MIGTemplate []Geometry `json:"migtemplate,omitempty"` + Health bool `json:"health,omitempty"` + DeviceVendor string `json:"devicevendor,omitempty"` + CustomInfo map[string]any `json:"custominfo,omitempty"` + DevicePairScore DevicePairScore `json:"devicepairscore,omitempty"` +} + +type DevicePairScores []DevicePairScore +type DevicePairScore struct { + ID string `json:"uuid,omitempty"` + Scores map[string]int `json:"score,omitempty"` +} + +type DeviceUsage struct { + // ID string + // Index uint + Used int32 + // Count int32 + Usedmem int32 + // Totalmem int32 + // Totalcore int32 + Usedcores int32 + // Mode string + // MigTemplate []Geometry + // MigUsage MigInUse + // Numa int + // Type string + // Health bool + // CustomInfo map[string]any +} + +type ResourceNames struct { + ResourceCountName string + ResourceMemoryName string + ResourceCoreName string +} + +type ContainerDevice struct { + // TODO current Idx cannot use, because EncodeContainerDevices method not encode this filed. + Idx int + UUID string + Type string + Usedmem int32 + Usedcores int32 + CustomInfo map[string]any +} + +type ContainerDeviceRequest struct { + Nums int32 + Type string + Memreq int32 + MemPercentagereq int32 + Coresreq int32 +} + +const ( + // OneContainerMultiDeviceSplitSymbol this is when one container use multi device, use : symbol to join device info. + OneContainerMultiDeviceSplitSymbol = ":" + + // OnePodMultiContainerSplitSymbol this is when one pod having multi container and more than one container use device, use ; symbol to join device info. + OnePodMultiContainerSplitSymbol = ";" +) + +var ( + HandshakeAnnos = map[string]string{} + InRequestDevices map[string]string + SupportDevices map[string]string +) + +type ContainerDevices []ContainerDevice +type ContainerDeviceRequests map[string]ContainerDeviceRequest +type PodDeviceRequests []ContainerDeviceRequests +type PodSingleDevice []ContainerDevices +type PodDevices map[string]PodSingleDevice + +func CheckHealth(devType string, n *corev1.Node) (bool, bool) { + handshake := n.Annotations[HandshakeAnnos[devType]] + if strings.Contains(handshake, "Requesting") { + formertime, _ := time.Parse(time.DateTime, strings.Split(handshake, "_")[1]) + return time.Now().Before(formertime.Add(time.Second * 60)), false + } else if strings.Contains(handshake, "Deleted") { + return true, false + } else { + return true, true + } +} + +func UnMarshalNodeDevices(str string) ([]*DeviceInfo, error) { + var dlist []*DeviceInfo + err := json.Unmarshal([]byte(str), &dlist) + return dlist, err +} + +func EncodeContainerDevices(cd ContainerDevices) string { + tmp := "" + for _, val := range cd { + tmp += val.UUID + "," + val.Type + "," + strconv.Itoa(int(val.Usedmem)) + "," + strconv.Itoa(int(val.Usedcores)) + OneContainerMultiDeviceSplitSymbol + } + klog.Infof("Encoded container Devices: %s", tmp) + return tmp + //return strings.Join(cd, ",") +} + +func EncodePodSingleDevice(pd PodSingleDevice) string { + res := "" + for _, ctrdevs := range pd { + res = res + EncodeContainerDevices(ctrdevs) + res = res + OnePodMultiContainerSplitSymbol + } + klog.Infof("Encoded pod single devices %s", res) + return res +} + +func DecodeContainerDevices(str string) (ContainerDevices, error) { + if len(str) == 0 { + return ContainerDevices{}, nil + } + cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol) + contdev := ContainerDevices{} + tmpdev := ContainerDevice{} + klog.V(5).Infof("Start to decode container device %s", str) + for _, val := range cd { + if strings.Contains(val, ",") { + //fmt.Println("cd is ", val) + tmpstr := strings.Split(val, ",") + if len(tmpstr) < 4 { + return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task") + } + tmpdev.UUID = tmpstr[0] + tmpdev.Type = tmpstr[1] + devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32) + tmpdev.Usedmem = int32(devmem) + devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32) + tmpdev.Usedcores = int32(devcores) + contdev = append(contdev, tmpdev) + } + } + klog.V(5).Infof("Finished decoding container devices. Total devices: %d", len(contdev)) + return contdev, nil +} + +func DecodePodDevices(checklist map[string]string, annos map[string]string) (PodDevices, error) { + klog.V(5).Infof("checklist is [%+v], annos is [%+v]", checklist, annos) + if len(annos) == 0 { + return PodDevices{}, nil + } + pd := make(PodDevices) + for devID, devs := range checklist { + str, ok := annos[devs] + if !ok { + continue + } + pd[devID] = make(PodSingleDevice, 0) + for s := range strings.SplitSeq(str, OnePodMultiContainerSplitSymbol) { + cd, err := DecodeContainerDevices(s) + if err != nil { + return PodDevices{}, nil + } + if len(cd) == 0 { + continue + } + pd[devID] = append(pd[devID], cd) + } + } + klog.V(5).InfoS("Decoded pod annos", "poddevices", pd) + return pd, nil +} \ No newline at end of file diff --git a/pkg/scheduler/api/devices/util.go b/pkg/scheduler/api/devices/util.go index e23829f663..cd45516332 100644 --- a/pkg/scheduler/api/devices/util.go +++ b/pkg/scheduler/api/devices/util.go @@ -31,6 +31,15 @@ limitations under the License. package devices import ( + "context" + "encoding/json" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -81,3 +90,90 @@ func NewClient() (*kubernetes.Clientset, error) { client, err := kubernetes.NewForConfig(config) return client, err } + +func GetNode(nodename string) (*v1.Node, error) { + if nodename == "" { + klog.ErrorS(nil, "Node name is empty") + return nil, fmt.Errorf("nodename is empty") + } + + klog.V(5).InfoS("Fetching node", "nodeName", nodename) + n, err := GetClient().CoreV1().Nodes().Get(context.Background(), nodename, metav1.GetOptions{}) + if err != nil { + switch { + case apierrors.IsNotFound(err): + klog.ErrorS(err, "Node not found", "nodeName", nodename) + return nil, fmt.Errorf("node %s not found", nodename) + case apierrors.IsUnauthorized(err): + klog.ErrorS(err, "Unauthorized to access node", "nodeName", nodename) + return nil, fmt.Errorf("unauthorized to access node %s", nodename) + default: + klog.ErrorS(err, "Failed to get node", "nodeName", nodename) + return nil, fmt.Errorf("failed to get node %s: %v", nodename, err) + } + } + + klog.V(5).InfoS("Successfully fetched node", "nodeName", nodename) + return n, nil +} + +func MarkAnnotationsToDelete(devType string, nn string) error { + tmppat := make(map[string]string) + tmppat[devType] = "Deleted_" + time.Now().Format(time.DateTime) + n, err := GetNode(nn) + if err != nil { + klog.Errorln("get node failed", err.Error()) + return err + } + return PatchNodeAnnotations(n, tmppat) +} + +func PatchPodAnnotations(kubeClient kubernetes.Interface, pod *v1.Pod, annotations map[string]string) error { + type patchMetadata struct { + Annotations map[string]string `json:"annotations,omitempty"` + } + type patchPod struct { + Metadata patchMetadata `json:"metadata"` + //Spec patchSpec `json:"spec,omitempty"` + } + + p := patchPod{} + p.Metadata.Annotations = annotations + + bytes, err := json.Marshal(p) + if err != nil { + return err + } + _, err = kubeClient.CoreV1().Pods(pod.Namespace). + Patch(context.Background(), pod.Name, k8stypes.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + klog.Errorf("patch pod %v failed, %v", pod.Name, err) + } + + return err +} + +func PatchNodeAnnotations(node *v1.Node, annotations map[string]string) error { + type patchMetadata struct { + Annotations map[string]string `json:"annotations,omitempty"` + } + type patchPod struct { + Metadata patchMetadata `json:"metadata"` + //Spec patchSpec `json:"spec,omitempty"` + } + + p := patchPod{} + p.Metadata.Annotations = annotations + + bytes, err := json.Marshal(p) + if err != nil { + return err + } + _, err = GetClient().CoreV1().Nodes(). + Patch(context.Background(), node.Name, k8stypes.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + klog.Infoln("annotations=", annotations) + klog.Infof("patch node %v failed, %v", node.Name, err) + } + return err +} diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 12306aeae7..b8b44e6153 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -350,10 +351,16 @@ func (ni *NodeInfo) setNodeOthersResource(node *v1.Node) { ni.Others[gpushare.DeviceName] = gpushare.NewGPUDevices(ni.Name, node) ni.Others[vgpu.DeviceName] = vgpu.NewGPUDevices(ni.Name, node) ni.Others[vnpu.DeviceName] = vnpu.NewNPUDevices(ni.Name, node) + ascend_ignored_list := []string{} + for device_name, devices := range ascend.NewAscendDevices(ni.Name, node) { + ni.Others[device_name] = devices + ascend_ignored_list = append(ascend_ignored_list, devices.GetIgnoredDevices()...) + } IgnoredDevicesList.Set( ni.Others[gpushare.DeviceName].(Devices).GetIgnoredDevices(), ni.Others[vgpu.DeviceName].(Devices).GetIgnoredDevices(), ni.Others[vnpu.DeviceName].(Devices).GetIgnoredDevices(), + ascend_ignored_list, ) } From f81e076f2530e62db5721f2750e93f53c0841d90 Mon Sep 17 00:00:00 2001 From: dl239 Date: Thu, 23 Oct 2025 18:41:35 +0800 Subject: [PATCH 2/7] feat: support ascend910 topology awareness Signed-off-by: dl239 --- .../api/devices/ascend/device_info.go | 277 +++++++++++++----- 1 file changed, 199 insertions(+), 78 deletions(-) diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index c64fd4f0f5..8b73e2bcdb 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -36,6 +36,7 @@ import ( "fmt" "sort" "strconv" + "strings" "time" "github.com/pkg/errors" @@ -68,14 +69,14 @@ type AscendDevice struct { handshakeAnno string DeviceInfo *devices.DeviceInfo DeviceUsage *devices.DeviceUsage + Score float64 } type AscendDevices struct { NodeName string Type string - Score float64 Devices map[string]*AscendDevice - CandicateDevice devices.PodSingleDevice + Policy string } type RuntimeInfo struct { @@ -209,52 +210,60 @@ func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { return false } -func getAscendDevicesSnapShot(ads *AscendDevices) *AscendDevices { - dup_ads := &AscendDevices{ - Devices: make(map[string]*AscendDevice), - } - for id, dev := range ads.Devices { - dup_dev := &AscendDevice{ - config: dev.config, - nodeRegisterAnno: dev.nodeRegisterAnno, - useUUIDAnno: dev.useUUIDAnno, - noUseUUIDAnno: dev.noUseUUIDAnno, - handshakeAnno: dev.handshakeAnno, - DeviceInfo: dev.DeviceInfo, - DeviceUsage: &devices.DeviceUsage{ - Used: dev.DeviceUsage.Used, - Usedmem: dev.DeviceUsage.Usedmem, - Usedcores: dev.DeviceUsage.Usedcores, - }, - } - dup_ads.Devices[id] = dup_dev - } - return dup_ads -} - func (ads *AscendDevices) FilterNode(pod *v1.Pod, policy string) (int, string, error) { - devs, err := ads.selectDevices(pod, policy) + _, err := ads.selectDevices(pod, policy) if err != nil { return devices.Error, "no ascend device available", err } klog.V(4).Infoln("ascend DeviceSharing successfully filters pods. device_type:", ads.Type) - ads.CandicateDevice = devs - // cal score - ads.Score = 0 - for _, container_dev := range devs { - for _, d := range container_dev { - device_info, ok := ads.Devices[d.UUID] - if !ok { - continue - } - ads.Score += CalScore(policy, device_info.DeviceUsage, device_info.DeviceInfo) - } - } return devices.Success, "", nil } func (ads *AscendDevices) ScoreNode(pod *v1.Pod, policy string) float64 { - return ads.Score + ads.Policy = policy + pod_devs, err := ads.selectDevices(pod, policy) + if err != nil { + return 0 + } + score := 0.0 + var used_devs []*AscendDevice + for _, dev := range pod_devs { + dev, ok := ads.Devices[dev[0].UUID] + if !ok { + return 0 + } + used_devs = append(used_devs, dev) + score += CalScore(policy, dev.DeviceUsage, dev.DeviceInfo) + } + + if strings.HasPrefix(ads.Type, Ascend910Prefix) && hasNetworkID(used_devs) { + klog.V(4).Infof("all devices have NetworkID. device CommonWord %s", ads.Type) + cntMap := make(map[int]int) + for _, dev := range used_devs { + if dev.DeviceInfo.CustomInfo == nil { + return 0 + } + if networkID, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; ok { + if id, ok := networkID.(float64); ok { + cntMap[int(id)]++ + } + } else { + return 0 + } + } + maxCnt, totalCnt := 0, 0 + for _, cnt := range cntMap { + if cnt > maxCnt { + maxCnt = cnt + } + totalCnt += cnt + } + if totalCnt == 0 { + return 0 + } + score += (float64(maxCnt) / float64(totalCnt)) * Ascend910NetworkWeight + } + return score } func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { @@ -266,11 +275,15 @@ func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) return errors.Errorf("node %s locked for %s hamivgpu lockname %s", ads.NodeName, pod.Name, err.Error()) } } + pod_devs, err := ads.selectDevices(pod, ads.Policy) + if err != nil { + return errors.Errorf("failed to select ascend devices for pod %s: %v", pod.Name, err) + } annotations := make(map[string]string) - ads.PatchAnnotations(pod, &annotations, ads.CandicateDevice) + ads.PatchAnnotations(pod, &annotations, pod_devs) ads.addResource(annotations, pod) - err := devices.PatchPodAnnotations(kubeClient, pod, annotations) + err = devices.PatchPodAnnotations(kubeClient, pod, annotations) if err != nil { return err } @@ -294,59 +307,167 @@ func (ads *AscendDevices) GetStatus() string { } func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (devices.PodSingleDevice, error) { - rand_dev, err := ads.getRandomDevice() - if err != nil { + dup_devs := getDeviceSnapshot(ads) + if len(dup_devs) == 0 { return nil, errors.Errorf("no ascend device available") } - reqs := rand_dev.ResourceReqs(pod) - dup_ads := getAscendDevicesSnapShot(ads) + for _, dev := range dup_devs { + dev.Score = CalScore(schedulePolicy, dev.DeviceUsage, dev.DeviceInfo) + } + sort.Slice(dup_devs, func(i, j int) bool { + return dup_devs[i].Score > dup_devs[j].Score + }) + needTopology := false + if strings.HasPrefix(ads.Type, Ascend910Prefix) && hasNetworkID(dup_devs) { + klog.V(4).Infof("all devices have NetworkID. device CommonWord %s", ads.Type) + needTopology = true + } + reqs := dup_devs[0].ResourceReqs(pod) var pod_devs devices.PodSingleDevice + used_devs := make([]*AscendDevice, 0) for _, req := range reqs { - var selected_devs devices.ContainerDevices - for _, dup_ad := range dup_ads.Devices { - if req.Type != ads.Type { - continue - } - device_usage := dup_ad.DeviceUsage - device_info := dup_ad.DeviceInfo - if device_info.Count < device_usage.Used { - continue - } - memreq := int32(0) - if req.Memreq > 0 { - memreq = req.Memreq - } else if req.MemPercentagereq != 101 && req.Memreq == 0 { - memreq = device_info.Devmem * req.MemPercentagereq / 100 - } - if device_info.Devmem-device_usage.Usedmem < memreq { - continue + available_devs := make([]*AscendDevice, 0) + for _, dev := range dup_devs { + selected := false + for _, used_dev := range used_devs { + if used_dev.DeviceInfo.ID == dev.DeviceInfo.ID { + selected = true + break + } } - if device_info.Devcore-device_usage.Usedcores < req.Coresreq { - continue + if !selected { + available_devs = append(available_devs, dev) } - if device_info.Devcore == 100 && req.Coresreq == 100 && device_usage.Used > 0 { + } + req_nums := req.Nums + selected_devs := make([]*AscendDevice, 0) + for _, dev := range available_devs { + if fit(&req, dev) == false { continue } - if device_info.Devcore != 0 && device_usage.Usedcores == device_info.Devcore && req.Coresreq == 0 { - continue + selected_devs = append(selected_devs, dev) + req_nums -= 1 + if req_nums <= 0 && !needTopology { + break } - selected_devs = append(selected_devs, devices.ContainerDevice{ - UUID: device_info.ID, - Type: ads.Type, - Usedmem: memreq, - Usedcores: req.Coresreq, - CustomInfo: device_info.CustomInfo, - }) - break } - if len(selected_devs) < int(req.Nums) { + if req_nums >= 0 { return nil, errors.Errorf("no enough ascend device available") } - pod_devs = append(pod_devs, selected_devs) + if needTopology { + selected_devs = selectDevicesWithTopology(int(req.Nums), selected_devs) + } + used_devs = append(used_devs, selected_devs...) + var con_devs devices.ContainerDevices + for _, dev := range selected_devs { + con_devs = append(con_devs, devices.ContainerDevice{ + UUID: dev.DeviceInfo.ID, + Type: ads.Type, + Usedmem: req.Memreq, + Usedcores: req.Coresreq, + CustomInfo: dev.DeviceInfo.CustomInfo, + }) + } + pod_devs = append(pod_devs, con_devs) } return pod_devs, nil } +func hasNetworkID(devices []*AscendDevice) bool { + for _, dev := range devices { + if dev.DeviceInfo.CustomInfo == nil { + return false + } + if _, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; !ok { + return false + } + } + return true +} + +func fit(req *devices.ContainerDeviceRequest, dev *AscendDevice) bool { + if req.Type != dev.config.CommonWord { + return false + } + device_usage := dev.DeviceUsage + device_info := dev.DeviceInfo + if device_info.Count < device_usage.Used { + return false + } + if device_info.Devmem-device_usage.Usedmem < req.Memreq { + return false + } + if device_info.Devcore-device_usage.Usedcores < req.Coresreq { + return false + } + if device_info.Devcore == 100 && req.Coresreq == 100 && device_usage.Used > 0 { + return false + } + if device_info.Devcore != 0 && device_usage.Usedcores == device_info.Devcore && req.Coresreq == 0 { + return false + } + return true +} + +func getDeviceSnapshot(ads *AscendDevices) []*AscendDevice { + dup_devs := make([]*AscendDevice, 0) + for _, dev := range ads.Devices { + dup_dev := &AscendDevice{ + config: dev.config, + nodeRegisterAnno: dev.nodeRegisterAnno, + useUUIDAnno: dev.useUUIDAnno, + noUseUUIDAnno: dev.noUseUUIDAnno, + handshakeAnno: dev.handshakeAnno, + DeviceInfo: dev.DeviceInfo, + DeviceUsage: &devices.DeviceUsage{ + Used: dev.DeviceUsage.Used, + Usedmem: dev.DeviceUsage.Usedmem, + Usedcores: dev.DeviceUsage.Usedcores, + }, + } + dup_devs = append(dup_devs, dup_dev) + } + return dup_devs +} + +func selectDevicesWithTopology(req_nums int, selected_devs []*AscendDevice) []*AscendDevice { + network_map := make(map[int][]*AscendDevice) + + for _, dev := range selected_devs { + if dev.DeviceInfo.CustomInfo != nil { + if networkID, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; ok { + if id, ok := networkID.(float64); ok { + network_map[int(id)] = append(network_map[int(id)], dev) + } + } + } + } + type NetworkDeviceCount struct { + NetworkID int + Count int + } + var sortedNetworks []NetworkDeviceCount + for networkID, devices := range network_map { + sortedNetworks = append(sortedNetworks, NetworkDeviceCount{ + NetworkID: networkID, + Count: len(devices), + }) + } + sort.Slice(sortedNetworks, func(i, j int) bool { + return sortedNetworks[i].Count > sortedNetworks[j].Count + }) + devs := make([]*AscendDevice, 0) + for _, item := range sortedNetworks { + for _, dev := range network_map[item.NetworkID] { + devs = append(devs, dev) + if len(devs) == req_nums { + return devs + } + } + } + return devs +} + func (ads *AscendDevices) getRandomDevice() (*AscendDevice, error) { if len(ads.Devices) == 0 { return nil, errors.New("no ascend device available") From f15e3196c4ebc7930b165c9f4df571123fa9967d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 3 Nov 2025 16:53:06 +0800 Subject: [PATCH 3/7] merge --- .../api/devices/ascend/device_info.go | 38 +++++++++++---- pkg/scheduler/api/devices/device_info.go | 7 ++- pkg/scheduler/api/shared_device_pool.go | 6 +++ .../plugins/deviceshare/deviceshare.go | 47 ++++++++++++++++--- 4 files changed, 78 insertions(+), 20 deletions(-) diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index 8b73e2bcdb..12a5f65307 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -59,6 +59,8 @@ const ( spreadPolicy = "spread" binpackMultiplier = 100 spreadMultiplier = 100 + CMName = "volcano-vgpu-device-config" + CMNamespace = "kube-system" ) type AscendDevice struct { @@ -85,7 +87,7 @@ type RuntimeInfo struct { } var ( - enableAscend bool + AscendVNPUEnable bool configFile string NodeLockEnable bool ) @@ -93,13 +95,20 @@ var ( func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { ascend_devices := make(map[string]*AscendDevices) if node == nil { + klog.Warningf("Node is nil for node %s, returning empty AscendDevices", name) return ascend_devices } + cur_config := config.GetConfig() + if cur_config == nil { + klog.InfoS("cur config is null. call InitDevicesConfig") + config.InitDevicesConfig(CMName, CMNamespace) + } devs := InitDevices(config.GetConfig().VNPUs) + klog.Infof("NewAscendDevices. dev len %d", len(devs)) for _, dev := range devs { node_devices, err := dev.GetNodeDevices(*node) if err != nil { - klog.V(5).InfoS("Failed to get node devices", "nodeName", node.Name, "deviceType", dev.CommonWord(), "error", err) + klog.Warningf("Failed to get node devices. nodeName %s, deviceType %s, error %s" , node.Name, dev.CommonWord(), err) continue } as_devices := &AscendDevices{ @@ -144,6 +153,7 @@ func (ads *AscendDevices) SubResourceUsage(id string, cores int32, mem int32) er } func (ads *AscendDevices) AddResource(pod *v1.Pod) { + klog.Infof("AscendDevices AddResource") if ads == nil { return } @@ -192,6 +202,10 @@ func (ads *AscendDevices) AddQueueResource(pod *v1.Pod) map[string]float64 { } func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { + if !AscendVNPUEnable { + return false + } + klog.Infof("xxx %s check HasDeviceRequest", ads.Type) rand_dev, err := ads.getRandomDevice() if rand_dev == nil || err != nil { return false @@ -200,13 +214,16 @@ func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { for _, container := range pod.Spec.Containers { _, ok := container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceName)] if ok { + klog.Infof("xxx %s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceName) return true } _, ok = container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceMemoryName)] if ok { + klog.Infof("xxx %s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceMemoryName) return true } } + klog.Infof("xxx %s check HasDeviceRequest false", ads.Type) return false } @@ -290,7 +307,7 @@ func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) if NodeLockEnable { nodelock.ReleaseNodeLock(ads.NodeName, ads.Type) } - klog.V(3).Infoln("DeviceSharing:Allocate Success") + klog.V(3).Infoln("Allocate Success") return nil } @@ -299,7 +316,13 @@ func (ads *AscendDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) } func (ads *AscendDevices) GetIgnoredDevices() []string { - return []string{""} + rand_dev, err := ads.getRandomDevice() + if rand_dev == nil || err != nil { + return []string{""} + } + vnpu_config := rand_dev.config + klog.Infof("IgnoredDevices %s", vnpu_config.ResourceMemoryName) + return []string{vnpu_config.ResourceMemoryName} } func (ads *AscendDevices) GetStatus() string { @@ -491,10 +514,7 @@ func (dev *AscendDevice) trimMemory(m int64) (int64, string) { } func InitDevices(config []config.VNPUConfig) []*AscendDevice { - var devs []*AscendDevice - if !enableAscend { - return devs - } + devs := make([]*AscendDevice, 0) for _, vnpu := range config { commonWord := vnpu.CommonWord dev := &AscendDevice{ @@ -520,7 +540,7 @@ func InitDevices(config []config.VNPUConfig) []*AscendDevice { } func ParseConfig(fs *flag.FlagSet) { - fs.BoolVar(&enableAscend, "enable-ascend", false, "enable ascend device") + fs.BoolVar(&AscendVNPUEnable, "AscendVNPUEnable", false, "enable ascend device") } func (dev *AscendDevice) CommonWord() string { diff --git a/pkg/scheduler/api/devices/device_info.go b/pkg/scheduler/api/devices/device_info.go index 4e2cbd2ecf..be13a22ebc 100644 --- a/pkg/scheduler/api/devices/device_info.go +++ b/pkg/scheduler/api/devices/device_info.go @@ -32,7 +32,6 @@ package devices import ( "encoding/json" - "errors" "fmt" "strconv" "strings" @@ -116,8 +115,8 @@ const ( var ( HandshakeAnnos = map[string]string{} - InRequestDevices map[string]string - SupportDevices map[string]string + InRequestDevices = map[string]string{} + SupportDevices = map[string]string{} ) type ContainerDevices []ContainerDevice @@ -217,4 +216,4 @@ func DecodePodDevices(checklist map[string]string, annos map[string]string) (Pod } klog.V(5).InfoS("Decoded pod annos", "poddevices", pd) return pd, nil -} \ No newline at end of file +} diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index c6e524140a..b96196098f 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes" "volcano.sh/volcano/pkg/scheduler/api/devices/ascend/ascend310p/vnpu" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu" ) @@ -78,6 +79,7 @@ type Devices interface { var _ Devices = new(gpushare.GPUDevices) var _ Devices = new(vgpu.GPUDevices) var _ Devices = new(vnpu.NPUDevices) +var _ Devices = new(ascend.AscendDevices) var RegisteredDevices = []string{ gpushare.DeviceName, @@ -85,6 +87,10 @@ var RegisteredDevices = []string{ vnpu.DeviceName, } +func RegisterDevice(deviceName string) { + RegisteredDevices = append(RegisteredDevices, deviceName) +} + var IgnoredDevicesList = ignoredDevicesList{} type ignoredDevicesList struct { diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index fc59e36d41..cbff019973 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "reflect" + "sync" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -28,6 +29,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/volcano/pkg/scheduler/api/devices/ascend/ascend310p/vnpu" "volcano.sh/volcano/pkg/scheduler/api/devices/config" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -46,7 +48,8 @@ const ( VGPUEnable = "deviceshare.VGPUEnable" - ASCEND310PvGPU = "deviceshare.ASCEND310PVNPUEnable" + ASCEND310PvGPU = "deviceshare.ASCEND310PVGPUEnable" + AscendVNPUEnable = "deviceshare.AscendVNPUEnable" SchedulePolicyArgument = "deviceshare.SchedulePolicy" ScheduleWeight = "deviceshare.ScheduleWeight" @@ -55,6 +58,10 @@ const ( KnownGeometriesCMNamespace = "deviceshare.KnownGeometriesCMNamespace" ) +var ( + once sync.Once +) + type deviceSharePlugin struct { // Arguments given for the plugin pluginArguments framework.Arguments @@ -74,6 +81,7 @@ func (dp *deviceSharePlugin) Name() string { } func enablePredicate(dsp *deviceSharePlugin) { + klog.Infof("enablePredicate") // Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct. nodeLockEnable := false args := dsp.pluginArguments @@ -82,6 +90,8 @@ func enablePredicate(dsp *deviceSharePlugin) { args.GetBool(&nodeLockEnable, NodeLockEnable) args.GetBool(&vgpu.VGPUEnable, VGPUEnable) args.GetBool(&vnpu.Ascend310pvNPUEnable, ASCEND310PvGPU) + args.GetBool(&ascend.AscendVNPUEnable, AscendVNPUEnable) + klog.Infof("ascend.AscendVNPUEnable %t", ascend.AscendVNPUEnable) gpushare.NodeLockEnable = nodeLockEnable vgpu.NodeLockEnable = nodeLockEnable @@ -96,14 +106,23 @@ func enablePredicate(dsp *deviceSharePlugin) { klog.Fatal("gpu-share and vgpu can't be used together") } - if !vgpu.VGPUEnable { - return - } knownGeometriesCMName := "volcano-vgpu-device-config" args.GetString(&knownGeometriesCMName, KnownGeometriesCMName) knownGeometriesCMNamespace := "kube-system" args.GetString(&knownGeometriesCMNamespace, KnownGeometriesCMNamespace) config.InitDevicesConfig(knownGeometriesCMName, knownGeometriesCMNamespace) + registerDevices() +} + +func registerDevices() { + once.Do(func() { + if ascend.AscendVNPUEnable { + for _, vnpu := range config.GetConfig().VNPUs { + klog.Infof("register device %s", vnpu.CommonWord) + api.RegisterDevice(vnpu.CommonWord) + } + } + }) } func createStatus(code int, reason string) *api.Status { @@ -172,11 +191,14 @@ func initializeDevicesWithSession(ssn *framework.Session) { func initializeDevice(device api.Devices, ssn *framework.Session, nodeInfo *api.NodeInfo) error { switch d := device.(type) { case *vnpu.NPUDevices: - klog.V(3).Infof("initialize ascend310p device.") - return vnpu310p.InitVNPUDevice(d, ssn, nodeInfo) + if vnpu.Ascend310pvNPUEnable { + klog.V(3).Infof("initialize ascend310p device.") + return vnpu310p.InitVNPUDevice(d, ssn, nodeInfo) + } default: return nil } + return nil } func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { @@ -185,13 +207,15 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { // Register event handlers to update task info in PodLister & nodeMap ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { + klog.Infof("xxx enter PredicateFn") + defer klog.Infof("xxx leave PredicateFn") predicateStatus := make([]*api.Status, 0) // Check PredicateWithCache for _, val := range api.RegisteredDevices { if dev, ok := node.Others[val].(api.Devices); ok { if reflect.ValueOf(dev).IsNil() { // TODO When a pod requests a device of the current type, but the current node does not have such a device, an error is thrown - if dev == nil || dev.HasDeviceRequest(task.Pod) { + if dev == nil { predicateStatus = append(predicateStatus, &api.Status{ Code: devices.Unschedulable, Reason: "node not initialized with device" + val, @@ -202,8 +226,13 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) continue } + if !dev.HasDeviceRequest(task.Pod) { + klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) + continue + } code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy) if err != nil { + klog.V(4).Infof("pod %s/%s fit failed. device %s node %s err %v", task.Pod.Namespace, task.Pod.Name, val, node.Name, err) predicateStatus = append(predicateStatus, createStatus(code, msg)) return api.NewFitErrWithStatus(task, node, predicateStatus...) } @@ -224,7 +253,10 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { }) ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + klog.Infof("xxx enter NodeOrderFn") + defer klog.Infof("xxx leave NodeOrderFn") // DeviceScore + klog.Infof("Node: %s, task<%s/%s> NodeOrderFn", node.Name, task.Namespace, task.Name) nodeScore := float64(0) if dp.scheduleWeight > 0 { score, status := getDeviceScore(context.TODO(), task.Pod, node, dp.schedulePolicy) @@ -237,6 +269,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { nodeScore = float64(score) * float64(dp.scheduleWeight) klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, dp.scheduleWeight, nodeScore) } + klog.Infof("Node: %s, task<%s/%s> NodeOrderFn end", node.Name, task.Namespace, task.Name) return nodeScore, nil }) From 91ebec760c1efe7b288cee4e8bfff7dc76aef772 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 4 Nov 2025 17:13:57 +0800 Subject: [PATCH 4/7] test: add test --- .../api/devices/ascend/device_info.go | 14 +- .../api/devices/ascend/device_info_test.go | 160 ++++++++++++++++++ pkg/scheduler/api/devices/util.go | 10 ++ 3 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 pkg/scheduler/api/devices/ascend/device_info_test.go diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index 12a5f65307..f46f92d32a 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -284,7 +284,7 @@ func (ads *AscendDevices) ScoreNode(pod *v1.Pod, policy string) float64 { } func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { - klog.V(4).Infoln("hami-vnpu DeviceSharing: Into AllocateToPod", pod.Name) + klog.V(4).Infof("Allocate device %s to Pod %s", ads.Type, pod.Name) if NodeLockEnable { nodelock.UseClient(kubeClient) err := nodelock.LockNode(ads.NodeName, ads.Type) @@ -300,6 +300,11 @@ func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) ads.PatchAnnotations(pod, &annotations, pod_devs) ads.addResource(annotations, pod) + annotations[devices.AssignedNodeAnnotations] = ads.NodeName + annotations[devices.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) + annotations[devices.DeviceBindPhase] = "allocating" + annotations[devices.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) + err = devices.PatchPodAnnotations(kubeClient, pod, annotations) if err != nil { return err @@ -349,6 +354,7 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev var pod_devs devices.PodSingleDevice used_devs := make([]*AscendDevice, 0) for _, req := range reqs { + klog.Infof("req %v", req) available_devs := make([]*AscendDevice, 0) for _, dev := range dup_devs { selected := false @@ -365,7 +371,9 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev req_nums := req.Nums selected_devs := make([]*AscendDevice, 0) for _, dev := range available_devs { + klog.Infof("xxxx check. req %v dev %v", req, dev) if fit(&req, dev) == false { + klog.Infof("fit false. req %v dev %v", req, dev) continue } selected_devs = append(selected_devs, dev) @@ -374,7 +382,8 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev break } } - if req_nums >= 0 { + if req_nums > 0 { + klog.InfoS("xxxx req_nums > 0", "req_nums", req_nums) return nil, errors.Errorf("no enough ascend device available") } if needTopology { @@ -587,6 +596,7 @@ func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.Con memnum = int(m) } } + klog.Infof("raw mem %v memnum %d", mem, memnum) corenum := int32(0) mempnum := 0 diff --git a/pkg/scheduler/api/devices/ascend/device_info_test.go b/pkg/scheduler/api/devices/ascend/device_info_test.go new file mode 100644 index 0000000000..df1ce061ac --- /dev/null +++ b/pkg/scheduler/api/devices/ascend/device_info_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2023 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ascend + +import ( + "fmt" + "testing" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" + //"volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/config" +) + +var config_yaml = ` +vnpus: +- chipName: 310P3 + + commonWord: Ascend310P + resourceName: huawei.com/Ascend310P + resourceMemoryName: huawei.com/Ascend310P-memory + memoryAllocatable: 21527 + memoryCapacity: 24576 + aiCore: 8 + aiCPU: 7 + templates: + - name: vir01 + memory: 3072 + aiCore: 1 + aiCPU: 1 + - name: vir02 + memory: 6144 + aiCore: 2 + aiCPU: 2 + - name: vir04 + memory: 12288 + aiCore: 4 + aiCPU: 4 +- chipName: 910B3 + commonWord: Ascend910B3 + resourceName: huawei.com/Ascend910B3 + resourceMemoryName: huawei.com/Ascend910B3-memory + memoryAllocatable: 65536 + memoryCapacity: 65536 + aiCore: 20 + aiCPU: 7 + templates: + - name: vir05_1c_16g + memory: 16384 + aiCore: 5 + aiCPU: 1 + - name: vir10_3c_32g + memory: 32768 + aiCore: 10 + aiCPU: 3 +nvidia: + resourceCountName: volcano.sh/vgpu-number + resourceMemoryName: volcano.sh/vgpu-memory + resourceMemoryPercentageName: volcano.sh/vgpu-memory-percentage + resourceCoreName: volcano.sh/vgpu-cores + overwriteEnv: false + defaultMemory: 0 + defaultCores: 0 + defaultGPUNum: 1 + deviceSplitCount: 10 + deviceMemoryScaling: 1 + deviceCoreScaling: 1 + gpuMemoryFactor: 1 + knownMigGeometries: + - models: [ "A30" ] + allowedGeometries: + - group: group1 + geometries: + - name: 1g.6gb + memory: 6144 + count: 4 + - group: group2 + geometries: + - name: 2g.12gb + memory: 12288 + count: 2 + - group: group3 + geometries: + - name: 4g.24gb + memory: 24576 + count: 1 +` + +func yamlStringToConfig(yamlStr string) (*config.Config, error) { + var config config.Config + err := yaml.Unmarshal([]byte(yamlStr), &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal YAML: %v", err) + } + return &config, nil +} + +func Test_trimMemory(t *testing.T) { + _, err := yamlStringToConfig(config_yaml) + conf, err := yamlStringToConfig(config_yaml) + assert.Nil(t, err) + dev := AscendDevice{ + config: conf.VNPUs[0], + } + tests := []struct { + name string + inputMem int64 + wantMem int64 + }{ + {"test1", 0, 3072}, + {"test2", 1, 3072}, + {"test3", 100, 3072}, + {"test4", 3071, 3072}, + {"test5", 3072, 3072}, + {"test6", 3073, 6144}, + {"test7", 6143, 6144}, + {"test8", 6144, 6144}, + {"test9", 6144, 6144}, + {"test10", 6145, 12288}, + {"test11", 12288, 12288}, + {"test12", 12289, 21527}, + {"test13", 21527, 21527}, + {"test14", 21528, 21527}, + {"test15", 24576, 21527}, + {"test16", 24577, 0}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := dev.trimMemory(tt.inputMem) + assert.Equal(t, tt.wantMem, got) + }) + } +} diff --git a/pkg/scheduler/api/devices/util.go b/pkg/scheduler/api/devices/util.go index cd45516332..e04499e512 100644 --- a/pkg/scheduler/api/devices/util.go +++ b/pkg/scheduler/api/devices/util.go @@ -68,6 +68,16 @@ const ( Skip ) +const ( + AssignedNodeAnnotations = "hami.io/vgpu-node" + AssignedTimeAnnotations = "hami.io/vgpu-time" + BindTimeAnnotations = "hami.io/bind-time" + DeviceBindPhase = "hami.io/bind-phase" + DeviceBindAllocating = "allocating" + DeviceBindFailed = "failed" + DeviceBindSuccess = "success" +) + var kubeClient *kubernetes.Clientset func GetClient() kubernetes.Interface { From 5d1bac1bfde6b84cda41b0d8b8936843637cfacd Mon Sep 17 00:00:00 2001 From: root Date: Wed, 5 Nov 2025 18:15:53 +0800 Subject: [PATCH 5/7] fix: addresource --- .../api/devices/ascend/device_info.go | 73 ++++++++++++------- pkg/scheduler/api/node_info.go | 14 ++++ .../plugins/deviceshare/deviceshare.go | 4 - 3 files changed, 62 insertions(+), 29 deletions(-) diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index f46f92d32a..0c24c05fb8 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -102,9 +102,9 @@ func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { if cur_config == nil { klog.InfoS("cur config is null. call InitDevicesConfig") config.InitDevicesConfig(CMName, CMNamespace) + cur_config = config.GetConfig() } - devs := InitDevices(config.GetConfig().VNPUs) - klog.Infof("NewAscendDevices. dev len %d", len(devs)) + devs := InitDevices(cur_config.VNPUs) for _, dev := range devs { node_devices, err := dev.GetNodeDevices(*node) if err != nil { @@ -117,19 +117,40 @@ func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { Devices: make(map[string]*AscendDevice), } for _, nd := range node_devices { - dev.DeviceInfo = nd - dev.DeviceUsage = &devices.DeviceUsage{ - Used: 0, - Usedmem: 0, - Usedcores: 0, + cur_dev := &AscendDevice{ + config: dev.config, + nodeRegisterAnno: dev.nodeRegisterAnno, + useUUIDAnno: dev.useUUIDAnno, + noUseUUIDAnno: dev.noUseUUIDAnno, + handshakeAnno: dev.handshakeAnno, + DeviceInfo: nd, + DeviceUsage: &devices.DeviceUsage{ + Used: 0, + Usedmem: 0, + Usedcores: 0, + }, } - as_devices.Devices[nd.ID] = dev + as_devices.Devices[nd.ID] = cur_dev + klog.V(5).Infof("add device. ID %s dev_info %+v", cur_dev.DeviceInfo.ID, cur_dev.DeviceInfo) } ascend_devices[dev.CommonWord()] = as_devices } return ascend_devices } +func GetAscendDeviceNames() ([]string){ + cur_config := config.GetConfig() + if cur_config == nil { + config.InitDevicesConfig(CMName, CMNamespace) + cur_config = config.GetConfig() + } + deviceNames := make([]string, 0, len(cur_config.VNPUs)) + for _, vnpu := range cur_config.VNPUs { + deviceNames = append(deviceNames, vnpu.CommonWord) + } + return deviceNames +} + func (ads *AscendDevices) AddResourceUsage(id string, cores int32, mem int32) error { dev, ok := ads.Devices[id] if !ok { @@ -153,7 +174,6 @@ func (ads *AscendDevices) SubResourceUsage(id string, cores int32, mem int32) er } func (ads *AscendDevices) AddResource(pod *v1.Pod) { - klog.Infof("AscendDevices AddResource") if ads == nil { return } @@ -167,7 +187,6 @@ func (ads *AscendDevices) SubResource(pod *v1.Pod) { ano_key := devices.InRequestDevices[ads.Type] ano, ok := pod.Annotations[ano_key] if !ok { - klog.Errorf("pod %s has no annotation %s", pod.Name, ano_key) return } con_devs, err := devices.DecodeContainerDevices(ano) @@ -184,7 +203,6 @@ func (ads *AscendDevices) addResource(annotations map[string]string, pod *v1.Pod ano_key := devices.InRequestDevices[ads.Type] ano, ok := annotations[ano_key] if !ok { - klog.Errorf("pod %s has no annotation %s", pod.Name, ano_key) return } con_devs, err := devices.DecodeContainerDevices(ano) @@ -205,7 +223,6 @@ func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { if !AscendVNPUEnable { return false } - klog.Infof("xxx %s check HasDeviceRequest", ads.Type) rand_dev, err := ads.getRandomDevice() if rand_dev == nil || err != nil { return false @@ -214,16 +231,16 @@ func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { for _, container := range pod.Spec.Containers { _, ok := container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceName)] if ok { - klog.Infof("xxx %s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceName) + klog.V(5).Infof("%s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceName) return true } _, ok = container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceMemoryName)] if ok { - klog.Infof("xxx %s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceMemoryName) + klog.V(5).Infof("%s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceMemoryName) return true } } - klog.Infof("xxx %s check HasDeviceRequest false", ads.Type) + klog.V(5).Infof("%s check HasDeviceRequest false", ads.Type) return false } @@ -312,7 +329,7 @@ func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) if NodeLockEnable { nodelock.ReleaseNodeLock(ads.NodeName, ads.Type) } - klog.V(3).Infoln("Allocate Success") + klog.V(4).Infof("Allocate Success. device %s Pod %s", ads.Type, pod.Name) return nil } @@ -326,7 +343,6 @@ func (ads *AscendDevices) GetIgnoredDevices() []string { return []string{""} } vnpu_config := rand_dev.config - klog.Infof("IgnoredDevices %s", vnpu_config.ResourceMemoryName) return []string{vnpu_config.ResourceMemoryName} } @@ -354,7 +370,7 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev var pod_devs devices.PodSingleDevice used_devs := make([]*AscendDevice, 0) for _, req := range reqs { - klog.Infof("req %v", req) + klog.V(5).Infof("req %+v", req) available_devs := make([]*AscendDevice, 0) for _, dev := range dup_devs { selected := false @@ -371,9 +387,9 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev req_nums := req.Nums selected_devs := make([]*AscendDevice, 0) for _, dev := range available_devs { - klog.Infof("xxxx check. req %v dev %v", req, dev) + klog.V(5).Infof("check fit. req %+v dev_info %+v dev_usage %+v", req, dev.DeviceInfo, dev.DeviceUsage) if fit(&req, dev) == false { - klog.Infof("fit false. req %v dev %v", req, dev) + klog.V(5).Infof("fit false. dev ID %s", dev.DeviceInfo.ID) continue } selected_devs = append(selected_devs, dev) @@ -383,7 +399,7 @@ func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (dev } } if req_nums > 0 { - klog.InfoS("xxxx req_nums > 0", "req_nums", req_nums) + klog.V(5).Infof("no enough ascend device available! raw req_nums %d cur req_nums %d", req.Nums, req_nums) return nil, errors.Errorf("no enough ascend device available") } if needTopology { @@ -442,7 +458,7 @@ func fit(req *devices.ContainerDeviceRequest, dev *AscendDevice) bool { } func getDeviceSnapshot(ads *AscendDevices) []*AscendDevice { - dup_devs := make([]*AscendDevice, 0) + dup_devs := make([]*AscendDevice, 0, len(ads.Devices)) for _, dev := range ads.Devices { dup_dev := &AscendDevice{ config: dev.config, @@ -581,9 +597,7 @@ func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.Con v, ok = ctr.Resources.Requests[ascendResourceCount] } if ok { - klog.V(3).Infof("Counting %s devices", dev.config.CommonWord) if n, ok := v.AsInt64(); ok { - klog.Info("Found AscendDevices devices") memnum := 0 mem, ok := ctr.Resources.Limits[ascendResourceMem] if !ok { @@ -595,8 +609,8 @@ func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.Con m, _ := dev.trimMemory(memnums) memnum = int(m) } + klog.V(5).Infof("raw mem %d memnum %d", memnums, memnum) } - klog.Infof("raw mem %v memnum %d", mem, memnum) corenum := int32(0) mempnum := 0 @@ -604,6 +618,15 @@ func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.Con mempnum = 100 } + if corenum > 100 { + klog.ErrorS(nil, "core limit can't exceed 100", "device", dev.config.CommonWord) + corenum = 100 + } + if mempnum != 0 && memnum == 0 { + memnum = int(dev.DeviceInfo.Devmem) * mempnum / 100 + klog.V(5).Infof("new memreq %d totalmem %d mempercentage %d", memnum, dev.DeviceInfo.Devmem, mempnum) + } + return devices.ContainerDeviceRequest{ Nums: int32(n), Type: dev.CommonWord(), diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index b8b44e6153..3f9e3ccaf2 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -514,6 +514,13 @@ func (ni *NodeInfo) addResource(pod *v1.Pod) { } ni.Others[vgpu.DeviceName].(Devices).AddResource(pod) ni.Others[vnpu.DeviceName].(Devices).AddResource(pod) + for _, name := range ascend.GetAscendDeviceNames() { + if other, exists := ni.Others[name]; exists { + if devices, ok := other.(Devices); ok { + devices.AddResource(pod) + } + } + } } // subResource is used to subtract sharable devices @@ -523,6 +530,13 @@ func (ni *NodeInfo) subResource(pod *v1.Pod) { } ni.Others[vgpu.DeviceName].(Devices).SubResource(pod) ni.Others[vnpu.DeviceName].(Devices).SubResource(pod) + for _, name := range ascend.GetAscendDeviceNames() { + if other, exists := ni.Others[name]; exists { + if devices, ok := other.(Devices); ok { + devices.SubResource(pod) + } + } + } } // UpdateTask is used to update a task in nodeInfo object. diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index cbff019973..d3ba47b538 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -207,8 +207,6 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { // Register event handlers to update task info in PodLister & nodeMap ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { - klog.Infof("xxx enter PredicateFn") - defer klog.Infof("xxx leave PredicateFn") predicateStatus := make([]*api.Status, 0) // Check PredicateWithCache for _, val := range api.RegisteredDevices { @@ -253,8 +251,6 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { }) ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { - klog.Infof("xxx enter NodeOrderFn") - defer klog.Infof("xxx leave NodeOrderFn") // DeviceScore klog.Infof("Node: %s, task<%s/%s> NodeOrderFn", node.Name, task.Namespace, task.Name) nodeScore := float64(0) From 733ef0ee280619ca94c48e3825d880c870e401e9 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 6 Nov 2025 16:35:06 +0800 Subject: [PATCH 6/7] test: add test case --- .../api/devices/ascend/device_info.go | 2 +- .../api/devices/ascend/device_info_test.go | 117 +++++++++++++++++- 2 files changed, 116 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index 0c24c05fb8..80b2be282b 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Volcano Authors. +Copyright 2025 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/scheduler/api/devices/ascend/device_info_test.go b/pkg/scheduler/api/devices/ascend/device_info_test.go index df1ce061ac..eebb04ee43 100644 --- a/pkg/scheduler/api/devices/ascend/device_info_test.go +++ b/pkg/scheduler/api/devices/ascend/device_info_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Volcano Authors. +Copyright 2025 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ import ( "testing" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" - //"volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/api/devices/config" ) @@ -158,3 +158,116 @@ func Test_trimMemory(t *testing.T) { }) } } + +func Test_fit(t *testing.T) { + _, err := yamlStringToConfig(config_yaml) + conf, err := yamlStringToConfig(config_yaml) + assert.Nil(t, err) + device_info := &devices.DeviceInfo{ + ID: "68496E64-20E05477-92C31323-6E78030A-BD003019", + Index: 0, + Count: 7, + Devcore: 8, + Devmem: 21527, + } + tests := []struct { + name string + req *devices.ContainerDeviceRequest + dev *AscendDevice + result bool + }{ + { + "test1", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 1024, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 3072, + }, + }, + true, + }, + { + "test2", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 21527, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 3072, + }, + }, + false, + }, + { + "test3", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 6144, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 12288, + }, + }, + true, + }, + { + "test4", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 24576, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 0, + Usedmem: 0, + }, + }, + false, + }, + { + "test5_core", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 6144, + Coresreq: 4, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 12288, + Usedcores: 6, + }, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ret := fit(tt.req, tt.dev) + assert.Equal(t, tt.result, ret) + }) + } +} From c3fb5b3fa94a4edd319587e47b3143a8b32647b5 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 6 Nov 2025 16:57:31 +0800 Subject: [PATCH 7/7] refact: update log --- pkg/scheduler/api/devices/ascend/device_info.go | 2 +- pkg/scheduler/plugins/deviceshare/deviceshare.go | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go index 80b2be282b..2d4b30410c 100644 --- a/pkg/scheduler/api/devices/ascend/device_info.go +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -100,7 +100,7 @@ func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { } cur_config := config.GetConfig() if cur_config == nil { - klog.InfoS("cur config is null. call InitDevicesConfig") + klog.V(5).InfoS("cur config is null. call InitDevicesConfig") config.InitDevicesConfig(CMName, CMNamespace) cur_config = config.GetConfig() } diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index d3ba47b538..d574c4ab48 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -81,7 +81,6 @@ func (dp *deviceSharePlugin) Name() string { } func enablePredicate(dsp *deviceSharePlugin) { - klog.Infof("enablePredicate") // Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct. nodeLockEnable := false args := dsp.pluginArguments @@ -91,7 +90,6 @@ func enablePredicate(dsp *deviceSharePlugin) { args.GetBool(&vgpu.VGPUEnable, VGPUEnable) args.GetBool(&vnpu.Ascend310pvNPUEnable, ASCEND310PvGPU) args.GetBool(&ascend.AscendVNPUEnable, AscendVNPUEnable) - klog.Infof("ascend.AscendVNPUEnable %t", ascend.AscendVNPUEnable) gpushare.NodeLockEnable = nodeLockEnable vgpu.NodeLockEnable = nodeLockEnable @@ -137,16 +135,12 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu s := float64(0) for deviceType, device := range node.Others { if device.(api.Devices).HasDeviceRequest(pod) { - var ns float64 // Only process device types that use NodeOrderFn (vgpu and gpushare) // vnpu devices use BatchNodeOrderFn, skip them here - if deviceType == vgpu.DeviceName || deviceType == gpushare.DeviceName { - ns = device.(api.Devices).ScoreNode(pod, schedulePolicy) - } else { - // Other device types (like vnpu) use BatchNodeOrderFn, skip scoring here - continue + if deviceType != vnpu.NPUDevices { + ns := device.(api.Devices).ScoreNode(pod, schedulePolicy) + s += ns } - s += ns } } klog.V(4).Infof("deviceScore for task %s/%s is: %v", pod.Namespace, pod.Name, s) @@ -252,7 +246,6 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { // DeviceScore - klog.Infof("Node: %s, task<%s/%s> NodeOrderFn", node.Name, task.Namespace, task.Name) nodeScore := float64(0) if dp.scheduleWeight > 0 { score, status := getDeviceScore(context.TODO(), task.Pod, node, dp.schedulePolicy) @@ -265,7 +258,6 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { nodeScore = float64(score) * float64(dp.scheduleWeight) klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f", node.Name, task.Namespace, task.Name, dp.scheduleWeight, nodeScore) } - klog.Infof("Node: %s, task<%s/%s> NodeOrderFn end", node.Name, task.Namespace, task.Name) return nodeScore, nil })