diff --git a/pkg/scheduler/api/devices/ascend/device_info.go b/pkg/scheduler/api/devices/ascend/device_info.go new file mode 100644 index 0000000000..2d4b30410c --- /dev/null +++ b/pkg/scheduler/api/devices/ascend/device_info.go @@ -0,0 +1,704 @@ +/* +Copyright 2025 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ascend + +import ( + "encoding/json" + "flag" + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/config" + "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" +) + +const ( + NodeLockAscend = "hami.io/mutex.lock" + Ascend910Prefix = "Ascend910" + Ascend910NetworkWeight = 10 + // binpack means the lower device memory remained after this allocation, the better + binpackPolicy = "binpack" + // spread means better put this task into an idle GPU card than a shared GPU card + spreadPolicy = "spread" + binpackMultiplier = 100 + spreadMultiplier = 100 + CMName = "volcano-vgpu-device-config" + CMNamespace = "kube-system" +) + +type AscendDevice struct { + config config.VNPUConfig + nodeRegisterAnno string + useUUIDAnno string + noUseUUIDAnno string + handshakeAnno string + DeviceInfo *devices.DeviceInfo + DeviceUsage *devices.DeviceUsage + Score float64 +} + +type AscendDevices struct { + NodeName string + Type string + Devices map[string]*AscendDevice + Policy string +} + +type RuntimeInfo struct { + UUID string `json:"UUID,omitempty"` + Temp string `json:"temp,omitempty"` +} + +var ( + AscendVNPUEnable bool + configFile string + NodeLockEnable bool +) + +func NewAscendDevices(name string, node *v1.Node) map[string]*AscendDevices { + ascend_devices := make(map[string]*AscendDevices) + if node == nil { + klog.Warningf("Node is nil for node %s, returning empty AscendDevices", name) + return ascend_devices + } + cur_config := config.GetConfig() + if cur_config == nil { + klog.V(5).InfoS("cur config is null. call InitDevicesConfig") + config.InitDevicesConfig(CMName, CMNamespace) + cur_config = config.GetConfig() + } + devs := InitDevices(cur_config.VNPUs) + for _, dev := range devs { + node_devices, err := dev.GetNodeDevices(*node) + if err != nil { + klog.Warningf("Failed to get node devices. nodeName %s, deviceType %s, error %s" , node.Name, dev.CommonWord(), err) + continue + } + as_devices := &AscendDevices{ + NodeName: name, + Type: dev.CommonWord(), + Devices: make(map[string]*AscendDevice), + } + for _, nd := range node_devices { + cur_dev := &AscendDevice{ + config: dev.config, + nodeRegisterAnno: dev.nodeRegisterAnno, + useUUIDAnno: dev.useUUIDAnno, + noUseUUIDAnno: dev.noUseUUIDAnno, + handshakeAnno: dev.handshakeAnno, + DeviceInfo: nd, + DeviceUsage: &devices.DeviceUsage{ + Used: 0, + Usedmem: 0, + Usedcores: 0, + }, + } + as_devices.Devices[nd.ID] = cur_dev + klog.V(5).Infof("add device. ID %s dev_info %+v", cur_dev.DeviceInfo.ID, cur_dev.DeviceInfo) + } + ascend_devices[dev.CommonWord()] = as_devices + } + return ascend_devices +} + +func GetAscendDeviceNames() ([]string){ + cur_config := config.GetConfig() + if cur_config == nil { + config.InitDevicesConfig(CMName, CMNamespace) + cur_config = config.GetConfig() + } + deviceNames := make([]string, 0, len(cur_config.VNPUs)) + for _, vnpu := range cur_config.VNPUs { + deviceNames = append(deviceNames, vnpu.CommonWord) + } + return deviceNames +} + +func (ads *AscendDevices) AddResourceUsage(id string, cores int32, mem int32) error { + dev, ok := ads.Devices[id] + if !ok { + return fmt.Errorf("ascend device %s not found", id) + } + dev.DeviceUsage.Used++ + dev.DeviceUsage.Usedcores += cores + dev.DeviceUsage.Usedmem += mem + return nil +} + +func (ads *AscendDevices) SubResourceUsage(id string, cores int32, mem int32) error { + dev, ok := ads.Devices[id] + if !ok { + return fmt.Errorf("ascend device %s not found", id) + } + dev.DeviceUsage.Used-- + dev.DeviceUsage.Usedcores -= cores + dev.DeviceUsage.Usedmem -= mem + return nil +} + +func (ads *AscendDevices) AddResource(pod *v1.Pod) { + if ads == nil { + return + } + ads.addResource(pod.Annotations, pod) +} + +func (ads *AscendDevices) SubResource(pod *v1.Pod) { + if ads == nil { + return + } + ano_key := devices.InRequestDevices[ads.Type] + ano, ok := pod.Annotations[ano_key] + if !ok { + return + } + con_devs, err := devices.DecodeContainerDevices(ano) + if err != nil { + klog.ErrorS(err, "failed to decode container devices", "pod", pod.Name, "annotation", ano) + return + } + for _, cono_dev := range con_devs { + ads.SubResourceUsage(cono_dev.UUID, cono_dev.Usedcores, cono_dev.Usedmem) + } +} + +func (ads *AscendDevices) addResource(annotations map[string]string, pod *v1.Pod) { + ano_key := devices.InRequestDevices[ads.Type] + ano, ok := annotations[ano_key] + if !ok { + return + } + con_devs, err := devices.DecodeContainerDevices(ano) + if err != nil { + klog.ErrorS(err, "failed to decode container devices", "pod", pod.Name, "annotation", ano) + return + } + for _, cono_dev := range con_devs { + ads.AddResourceUsage(cono_dev.UUID, cono_dev.Usedcores, cono_dev.Usedmem) + } +} + +func (ads *AscendDevices) AddQueueResource(pod *v1.Pod) map[string]float64 { + return map[string]float64{} +} + +func (ads *AscendDevices) HasDeviceRequest(pod *v1.Pod) bool { + if !AscendVNPUEnable { + return false + } + rand_dev, err := ads.getRandomDevice() + if rand_dev == nil || err != nil { + return false + } + var vnpu_config = rand_dev.config + for _, container := range pod.Spec.Containers { + _, ok := container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceName)] + if ok { + klog.V(5).Infof("%s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceName) + return true + } + _, ok = container.Resources.Limits[v1.ResourceName(vnpu_config.ResourceMemoryName)] + if ok { + klog.V(5).Infof("%s check HasDeviceRequest ok. %s", ads.Type, vnpu_config.ResourceMemoryName) + return true + } + } + klog.V(5).Infof("%s check HasDeviceRequest false", ads.Type) + return false +} + +func (ads *AscendDevices) FilterNode(pod *v1.Pod, policy string) (int, string, error) { + _, err := ads.selectDevices(pod, policy) + if err != nil { + return devices.Error, "no ascend device available", err + } + klog.V(4).Infoln("ascend DeviceSharing successfully filters pods. device_type:", ads.Type) + return devices.Success, "", nil +} + +func (ads *AscendDevices) ScoreNode(pod *v1.Pod, policy string) float64 { + ads.Policy = policy + pod_devs, err := ads.selectDevices(pod, policy) + if err != nil { + return 0 + } + score := 0.0 + var used_devs []*AscendDevice + for _, dev := range pod_devs { + dev, ok := ads.Devices[dev[0].UUID] + if !ok { + return 0 + } + used_devs = append(used_devs, dev) + score += CalScore(policy, dev.DeviceUsage, dev.DeviceInfo) + } + + if strings.HasPrefix(ads.Type, Ascend910Prefix) && hasNetworkID(used_devs) { + klog.V(4).Infof("all devices have NetworkID. device CommonWord %s", ads.Type) + cntMap := make(map[int]int) + for _, dev := range used_devs { + if dev.DeviceInfo.CustomInfo == nil { + return 0 + } + if networkID, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; ok { + if id, ok := networkID.(float64); ok { + cntMap[int(id)]++ + } + } else { + return 0 + } + } + maxCnt, totalCnt := 0, 0 + for _, cnt := range cntMap { + if cnt > maxCnt { + maxCnt = cnt + } + totalCnt += cnt + } + if totalCnt == 0 { + return 0 + } + score += (float64(maxCnt) / float64(totalCnt)) * Ascend910NetworkWeight + } + return score +} + +func (ads *AscendDevices) Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error { + klog.V(4).Infof("Allocate device %s to Pod %s", ads.Type, pod.Name) + if NodeLockEnable { + nodelock.UseClient(kubeClient) + err := nodelock.LockNode(ads.NodeName, ads.Type) + if err != nil { + return errors.Errorf("node %s locked for %s hamivgpu lockname %s", ads.NodeName, pod.Name, err.Error()) + } + } + pod_devs, err := ads.selectDevices(pod, ads.Policy) + if err != nil { + return errors.Errorf("failed to select ascend devices for pod %s: %v", pod.Name, err) + } + annotations := make(map[string]string) + ads.PatchAnnotations(pod, &annotations, pod_devs) + + ads.addResource(annotations, pod) + annotations[devices.AssignedNodeAnnotations] = ads.NodeName + annotations[devices.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) + annotations[devices.DeviceBindPhase] = "allocating" + annotations[devices.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10) + + err = devices.PatchPodAnnotations(kubeClient, pod, annotations) + if err != nil { + return err + } + if NodeLockEnable { + nodelock.ReleaseNodeLock(ads.NodeName, ads.Type) + } + klog.V(4).Infof("Allocate Success. device %s Pod %s", ads.Type, pod.Name) + return nil +} + +func (ads *AscendDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) error { + return nil +} + +func (ads *AscendDevices) GetIgnoredDevices() []string { + rand_dev, err := ads.getRandomDevice() + if rand_dev == nil || err != nil { + return []string{""} + } + vnpu_config := rand_dev.config + return []string{vnpu_config.ResourceMemoryName} +} + +func (ads *AscendDevices) GetStatus() string { + return "" +} + +func (ads *AscendDevices) selectDevices(pod *v1.Pod, schedulePolicy string) (devices.PodSingleDevice, error) { + dup_devs := getDeviceSnapshot(ads) + if len(dup_devs) == 0 { + return nil, errors.Errorf("no ascend device available") + } + for _, dev := range dup_devs { + dev.Score = CalScore(schedulePolicy, dev.DeviceUsage, dev.DeviceInfo) + } + sort.Slice(dup_devs, func(i, j int) bool { + return dup_devs[i].Score > dup_devs[j].Score + }) + needTopology := false + if strings.HasPrefix(ads.Type, Ascend910Prefix) && hasNetworkID(dup_devs) { + klog.V(4).Infof("all devices have NetworkID. device CommonWord %s", ads.Type) + needTopology = true + } + reqs := dup_devs[0].ResourceReqs(pod) + var pod_devs devices.PodSingleDevice + used_devs := make([]*AscendDevice, 0) + for _, req := range reqs { + klog.V(5).Infof("req %+v", req) + available_devs := make([]*AscendDevice, 0) + for _, dev := range dup_devs { + selected := false + for _, used_dev := range used_devs { + if used_dev.DeviceInfo.ID == dev.DeviceInfo.ID { + selected = true + break + } + } + if !selected { + available_devs = append(available_devs, dev) + } + } + req_nums := req.Nums + selected_devs := make([]*AscendDevice, 0) + for _, dev := range available_devs { + klog.V(5).Infof("check fit. req %+v dev_info %+v dev_usage %+v", req, dev.DeviceInfo, dev.DeviceUsage) + if fit(&req, dev) == false { + klog.V(5).Infof("fit false. dev ID %s", dev.DeviceInfo.ID) + continue + } + selected_devs = append(selected_devs, dev) + req_nums -= 1 + if req_nums <= 0 && !needTopology { + break + } + } + if req_nums > 0 { + klog.V(5).Infof("no enough ascend device available! raw req_nums %d cur req_nums %d", req.Nums, req_nums) + return nil, errors.Errorf("no enough ascend device available") + } + if needTopology { + selected_devs = selectDevicesWithTopology(int(req.Nums), selected_devs) + } + used_devs = append(used_devs, selected_devs...) + var con_devs devices.ContainerDevices + for _, dev := range selected_devs { + con_devs = append(con_devs, devices.ContainerDevice{ + UUID: dev.DeviceInfo.ID, + Type: ads.Type, + Usedmem: req.Memreq, + Usedcores: req.Coresreq, + CustomInfo: dev.DeviceInfo.CustomInfo, + }) + } + pod_devs = append(pod_devs, con_devs) + } + return pod_devs, nil +} + +func hasNetworkID(devices []*AscendDevice) bool { + for _, dev := range devices { + if dev.DeviceInfo.CustomInfo == nil { + return false + } + if _, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; !ok { + return false + } + } + return true +} + +func fit(req *devices.ContainerDeviceRequest, dev *AscendDevice) bool { + if req.Type != dev.config.CommonWord { + return false + } + device_usage := dev.DeviceUsage + device_info := dev.DeviceInfo + if device_info.Count < device_usage.Used { + return false + } + if device_info.Devmem-device_usage.Usedmem < req.Memreq { + return false + } + if device_info.Devcore-device_usage.Usedcores < req.Coresreq { + return false + } + if device_info.Devcore == 100 && req.Coresreq == 100 && device_usage.Used > 0 { + return false + } + if device_info.Devcore != 0 && device_usage.Usedcores == device_info.Devcore && req.Coresreq == 0 { + return false + } + return true +} + +func getDeviceSnapshot(ads *AscendDevices) []*AscendDevice { + dup_devs := make([]*AscendDevice, 0, len(ads.Devices)) + for _, dev := range ads.Devices { + dup_dev := &AscendDevice{ + config: dev.config, + nodeRegisterAnno: dev.nodeRegisterAnno, + useUUIDAnno: dev.useUUIDAnno, + noUseUUIDAnno: dev.noUseUUIDAnno, + handshakeAnno: dev.handshakeAnno, + DeviceInfo: dev.DeviceInfo, + DeviceUsage: &devices.DeviceUsage{ + Used: dev.DeviceUsage.Used, + Usedmem: dev.DeviceUsage.Usedmem, + Usedcores: dev.DeviceUsage.Usedcores, + }, + } + dup_devs = append(dup_devs, dup_dev) + } + return dup_devs +} + +func selectDevicesWithTopology(req_nums int, selected_devs []*AscendDevice) []*AscendDevice { + network_map := make(map[int][]*AscendDevice) + + for _, dev := range selected_devs { + if dev.DeviceInfo.CustomInfo != nil { + if networkID, ok := dev.DeviceInfo.CustomInfo["NetworkID"]; ok { + if id, ok := networkID.(float64); ok { + network_map[int(id)] = append(network_map[int(id)], dev) + } + } + } + } + type NetworkDeviceCount struct { + NetworkID int + Count int + } + var sortedNetworks []NetworkDeviceCount + for networkID, devices := range network_map { + sortedNetworks = append(sortedNetworks, NetworkDeviceCount{ + NetworkID: networkID, + Count: len(devices), + }) + } + sort.Slice(sortedNetworks, func(i, j int) bool { + return sortedNetworks[i].Count > sortedNetworks[j].Count + }) + devs := make([]*AscendDevice, 0) + for _, item := range sortedNetworks { + for _, dev := range network_map[item.NetworkID] { + devs = append(devs, dev) + if len(devs) == req_nums { + return devs + } + } + } + return devs +} + +func (ads *AscendDevices) getRandomDevice() (*AscendDevice, error) { + if len(ads.Devices) == 0 { + return nil, errors.New("no ascend device available") + } + for _, dev := range ads.Devices { + return dev, nil + } + return nil, errors.New("no ascend device available") +} + +func (dev *AscendDevice) trimMemory(m int64) (int64, string) { + for i := range dev.config.Templates { + if m <= dev.config.Templates[i].Memory { + return dev.config.Templates[i].Memory, dev.config.Templates[i].Name + } + } + if m <= dev.config.MemoryCapacity { + return dev.config.MemoryAllocatable, "" + } + return 0, "" +} + +func InitDevices(config []config.VNPUConfig) []*AscendDevice { + devs := make([]*AscendDevice, 0) + for _, vnpu := range config { + commonWord := vnpu.CommonWord + dev := &AscendDevice{ + config: vnpu, + nodeRegisterAnno: fmt.Sprintf("hami.io/node-register-%s", commonWord), + useUUIDAnno: fmt.Sprintf("hami.io/use-%s-uuid", commonWord), + noUseUUIDAnno: fmt.Sprintf("hami.io/no-use-%s-uuid", commonWord), + handshakeAnno: fmt.Sprintf("hami.io/node-handshake-%s", commonWord), + } + sort.Slice(dev.config.Templates, func(i, j int) bool { + return dev.config.Templates[i].Memory < dev.config.Templates[j].Memory + }) + _, ok := devices.InRequestDevices[commonWord] + if !ok { + devices.InRequestDevices[commonWord] = fmt.Sprintf("hami.io/%s-devices-to-allocate", commonWord) + devices.SupportDevices[commonWord] = fmt.Sprintf("hami.io/%s-devices-allocated", commonWord) + // util.HandshakeAnnos[commonWord] = dev.handshakeAnno + } + devs = append(devs, dev) + klog.Infof("load ascend vnpu config %s: %v", commonWord, dev.config) + } + return devs +} + +func ParseConfig(fs *flag.FlagSet) { + fs.BoolVar(&AscendVNPUEnable, "AscendVNPUEnable", false, "enable ascend device") +} + +func (dev *AscendDevice) CommonWord() string { + return dev.config.CommonWord +} + +func (dev *AscendDevice) GetNodeDevices(n v1.Node) ([]*devices.DeviceInfo, error) { + anno, ok := n.Annotations[dev.nodeRegisterAnno] + if !ok { + return []*devices.DeviceInfo{}, fmt.Errorf("annos not found %s", dev.nodeRegisterAnno) + } + nodeDevices, err := devices.UnMarshalNodeDevices(anno) + if err != nil { + klog.ErrorS(err, "failed to unmarshal node devices", "node", n.Name, "device annotation", anno) + return []*devices.DeviceInfo{}, err + } + if len(nodeDevices) == 0 { + klog.InfoS("no gpu device found", "node", n.Name, "device annotation", anno) + return []*devices.DeviceInfo{}, errors.New("no device found on node") + } + return nodeDevices, nil +} + +func (dev *AscendDevice) GenerateResourceRequests(ctr *v1.Container) devices.ContainerDeviceRequest { + ascendResourceCount := v1.ResourceName(dev.config.ResourceName) + ascendResourceMem := v1.ResourceName(dev.config.ResourceMemoryName) + v, ok := ctr.Resources.Limits[ascendResourceCount] + if !ok { + v, ok = ctr.Resources.Requests[ascendResourceCount] + } + if ok { + if n, ok := v.AsInt64(); ok { + memnum := 0 + mem, ok := ctr.Resources.Limits[ascendResourceMem] + if !ok { + mem, ok = ctr.Resources.Requests[ascendResourceMem] + } + if ok { + memnums, ok := mem.AsInt64() + if ok { + m, _ := dev.trimMemory(memnums) + memnum = int(m) + } + klog.V(5).Infof("raw mem %d memnum %d", memnums, memnum) + } + corenum := int32(0) + + mempnum := 0 + if memnum == 0 { + mempnum = 100 + } + + if corenum > 100 { + klog.ErrorS(nil, "core limit can't exceed 100", "device", dev.config.CommonWord) + corenum = 100 + } + if mempnum != 0 && memnum == 0 { + memnum = int(dev.DeviceInfo.Devmem) * mempnum / 100 + klog.V(5).Infof("new memreq %d totalmem %d mempercentage %d", memnum, dev.DeviceInfo.Devmem, mempnum) + } + + return devices.ContainerDeviceRequest{ + Nums: int32(n), + Type: dev.CommonWord(), + Memreq: int32(memnum), + MemPercentagereq: int32(mempnum), + Coresreq: corenum, + } + } + } + return devices.ContainerDeviceRequest{} +} + +func (dev *AscendDevice) ResourceReqs(pod *v1.Pod) []devices.ContainerDeviceRequest { + var reqs []devices.ContainerDeviceRequest + for _, ctr := range pod.Spec.Containers { + req := dev.GenerateResourceRequests(&ctr) + if req.Nums > 0 { + reqs = append(reqs, req) + } + } + return reqs +} + +func (ads *AscendDevices) PatchAnnotations(pod *v1.Pod, annoInput *map[string]string, devList devices.PodSingleDevice) map[string]string { + dev, err := ads.getRandomDevice() + if err != nil { + return *annoInput + } + commonWord := dev.CommonWord() + + (*annoInput)[devices.InRequestDevices[commonWord]] = devices.EncodePodSingleDevice(devList) + (*annoInput)[devices.SupportDevices[commonWord]] = devices.EncodePodSingleDevice(devList) + (*annoInput)["predicate-time"] = strconv.FormatInt(time.Now().Unix(), 10) + allocateStr := fmt.Sprintf("huawei.com/%s", dev.CommonWord()) + var rtInfo []RuntimeInfo + for _, dp := range devList { + for _, val := range dp { + _, temp := dev.trimMemory(int64(val.Usedmem)) + rtInfo = append(rtInfo, RuntimeInfo{ + UUID: val.UUID, + Temp: temp, + }) + } + } + s, err := json.Marshal(rtInfo) + if err != nil { + klog.ErrorS(err, "failed to marshal runtime info", "runtime info", rtInfo) + } + (*annoInput)[allocateStr] = string(s) + + return *annoInput +} + +func (dev *AscendDevice) GetResourceNames() devices.ResourceNames { + return devices.ResourceNames{ + ResourceCountName: dev.config.ResourceName, + ResourceMemoryName: dev.config.ResourceMemoryName, + ResourceCoreName: "", + } +} + +func CalScore(schedulePolicy string, dev_usage *devices.DeviceUsage, dev_info *devices.DeviceInfo) float64 { + var score float64 + switch schedulePolicy { + case binpackPolicy: + score = binpackMultiplier * (float64(dev_usage.Usedmem) / float64(dev_info.Devmem)) + case spreadPolicy: + if dev_usage.Used == 1 { + score = spreadMultiplier + } + default: + score = float64(0) + } + return score +} diff --git a/pkg/scheduler/api/devices/ascend/device_info_test.go b/pkg/scheduler/api/devices/ascend/device_info_test.go new file mode 100644 index 0000000000..eebb04ee43 --- /dev/null +++ b/pkg/scheduler/api/devices/ascend/device_info_test.go @@ -0,0 +1,273 @@ +/* +Copyright 2025 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ascend + +import ( + "fmt" + "testing" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/config" +) + +var config_yaml = ` +vnpus: +- chipName: 310P3 + + commonWord: Ascend310P + resourceName: huawei.com/Ascend310P + resourceMemoryName: huawei.com/Ascend310P-memory + memoryAllocatable: 21527 + memoryCapacity: 24576 + aiCore: 8 + aiCPU: 7 + templates: + - name: vir01 + memory: 3072 + aiCore: 1 + aiCPU: 1 + - name: vir02 + memory: 6144 + aiCore: 2 + aiCPU: 2 + - name: vir04 + memory: 12288 + aiCore: 4 + aiCPU: 4 +- chipName: 910B3 + commonWord: Ascend910B3 + resourceName: huawei.com/Ascend910B3 + resourceMemoryName: huawei.com/Ascend910B3-memory + memoryAllocatable: 65536 + memoryCapacity: 65536 + aiCore: 20 + aiCPU: 7 + templates: + - name: vir05_1c_16g + memory: 16384 + aiCore: 5 + aiCPU: 1 + - name: vir10_3c_32g + memory: 32768 + aiCore: 10 + aiCPU: 3 +nvidia: + resourceCountName: volcano.sh/vgpu-number + resourceMemoryName: volcano.sh/vgpu-memory + resourceMemoryPercentageName: volcano.sh/vgpu-memory-percentage + resourceCoreName: volcano.sh/vgpu-cores + overwriteEnv: false + defaultMemory: 0 + defaultCores: 0 + defaultGPUNum: 1 + deviceSplitCount: 10 + deviceMemoryScaling: 1 + deviceCoreScaling: 1 + gpuMemoryFactor: 1 + knownMigGeometries: + - models: [ "A30" ] + allowedGeometries: + - group: group1 + geometries: + - name: 1g.6gb + memory: 6144 + count: 4 + - group: group2 + geometries: + - name: 2g.12gb + memory: 12288 + count: 2 + - group: group3 + geometries: + - name: 4g.24gb + memory: 24576 + count: 1 +` + +func yamlStringToConfig(yamlStr string) (*config.Config, error) { + var config config.Config + err := yaml.Unmarshal([]byte(yamlStr), &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal YAML: %v", err) + } + return &config, nil +} + +func Test_trimMemory(t *testing.T) { + _, err := yamlStringToConfig(config_yaml) + conf, err := yamlStringToConfig(config_yaml) + assert.Nil(t, err) + dev := AscendDevice{ + config: conf.VNPUs[0], + } + tests := []struct { + name string + inputMem int64 + wantMem int64 + }{ + {"test1", 0, 3072}, + {"test2", 1, 3072}, + {"test3", 100, 3072}, + {"test4", 3071, 3072}, + {"test5", 3072, 3072}, + {"test6", 3073, 6144}, + {"test7", 6143, 6144}, + {"test8", 6144, 6144}, + {"test9", 6144, 6144}, + {"test10", 6145, 12288}, + {"test11", 12288, 12288}, + {"test12", 12289, 21527}, + {"test13", 21527, 21527}, + {"test14", 21528, 21527}, + {"test15", 24576, 21527}, + {"test16", 24577, 0}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := dev.trimMemory(tt.inputMem) + assert.Equal(t, tt.wantMem, got) + }) + } +} + +func Test_fit(t *testing.T) { + _, err := yamlStringToConfig(config_yaml) + conf, err := yamlStringToConfig(config_yaml) + assert.Nil(t, err) + device_info := &devices.DeviceInfo{ + ID: "68496E64-20E05477-92C31323-6E78030A-BD003019", + Index: 0, + Count: 7, + Devcore: 8, + Devmem: 21527, + } + tests := []struct { + name string + req *devices.ContainerDeviceRequest + dev *AscendDevice + result bool + }{ + { + "test1", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 1024, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 3072, + }, + }, + true, + }, + { + "test2", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 21527, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 3072, + }, + }, + false, + }, + { + "test3", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 6144, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 12288, + }, + }, + true, + }, + { + "test4", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 24576, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 0, + Usedmem: 0, + }, + }, + false, + }, + { + "test5_core", + &devices.ContainerDeviceRequest { + Nums: 1, + Type: "Ascend310P", + Memreq: 6144, + Coresreq: 4, + }, + &AscendDevice { + config: conf.VNPUs[0], + DeviceInfo: device_info, + DeviceUsage: &devices.DeviceUsage{ + Used: 1, + Usedmem: 12288, + Usedcores: 6, + }, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ret := fit(tt.req, tt.dev) + assert.Equal(t, tt.result, ret) + }) + } +} diff --git a/pkg/scheduler/api/devices/config/config.go b/pkg/scheduler/api/devices/config/config.go index ad61bacbf2..d5dc56d5dc 100644 --- a/pkg/scheduler/api/devices/config/config.go +++ b/pkg/scheduler/api/devices/config/config.go @@ -44,11 +44,30 @@ const ( hygon: resourceCountName: "volcano.sh/vdcu" ... + vnpus: + - chipName: 910B3 + commonWord: Ascend910B3 + resourceName: huawei.com/Ascend910B3 + resourceMemoryName: huawei.com/Ascend910B3-memory + memoryAllocatable: 65536 + memoryCapacity: 65536 + aiCore: 20 + aiCPU: 7 + templates: + - name: vir05_1c_16g + memory: 16384 + aiCore: 5 + aiCPU: 1 + - name: vir10_3c_32g + memory: 32768 + aiCore: 10 + aiCPU: 3 */ type Config struct { //NvidiaConfig is used for vGPU feature for nvidia, gpushare is not using this config NvidiaConfig NvidiaConfig `yaml:"nvidia"` + VNPUs []VNPUConfig `yaml:"vnpus"` } var ( diff --git a/pkg/scheduler/api/devices/config/vnpu.go b/pkg/scheduler/api/devices/config/vnpu.go new file mode 100644 index 0000000000..dc02910352 --- /dev/null +++ b/pkg/scheduler/api/devices/config/vnpu.go @@ -0,0 +1,36 @@ +/* +Copyright 2025 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +type Template struct { + Name string `yaml:"name"` + Memory int64 `yaml:"memory"` + AICore int32 `yaml:"aiCore,omitempty"` + AICPU int32 `yaml:"aiCPU,omitempty"` +} + +type VNPUConfig struct { + CommonWord string `yaml:"commonWord"` + ChipName string `yaml:"chipName"` + ResourceName string `yaml:"resourceName"` + ResourceMemoryName string `yaml:"resourceMemoryName"` + MemoryAllocatable int64 `yaml:"memoryAllocatable"` + MemoryCapacity int64 `yaml:"memoryCapacity"` + AICore int32 `yaml:"aiCore"` + AICPU int32 `yaml:"aiCPU"` + Templates []Template `yaml:"templates"` +} \ No newline at end of file diff --git a/pkg/scheduler/api/devices/device_info.go b/pkg/scheduler/api/devices/device_info.go new file mode 100644 index 0000000000..be13a22ebc --- /dev/null +++ b/pkg/scheduler/api/devices/device_info.go @@ -0,0 +1,219 @@ +/* +Copyright 2023 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devices + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" +) + +type DeviceInfo struct { + ID string `json:"id,omitempty"` + Index uint `json:"index,omitempty"` + Count int32 `json:"count,omitempty"` + Devmem int32 `json:"devmem,omitempty"` + Devcore int32 `json:"devcore,omitempty"` + Type string `json:"type,omitempty"` + Numa int `json:"numa,omitempty"` + Mode string `json:"mode,omitempty"` + // MIGTemplate []Geometry `json:"migtemplate,omitempty"` + Health bool `json:"health,omitempty"` + DeviceVendor string `json:"devicevendor,omitempty"` + CustomInfo map[string]any `json:"custominfo,omitempty"` + DevicePairScore DevicePairScore `json:"devicepairscore,omitempty"` +} + +type DevicePairScores []DevicePairScore +type DevicePairScore struct { + ID string `json:"uuid,omitempty"` + Scores map[string]int `json:"score,omitempty"` +} + +type DeviceUsage struct { + // ID string + // Index uint + Used int32 + // Count int32 + Usedmem int32 + // Totalmem int32 + // Totalcore int32 + Usedcores int32 + // Mode string + // MigTemplate []Geometry + // MigUsage MigInUse + // Numa int + // Type string + // Health bool + // CustomInfo map[string]any +} + +type ResourceNames struct { + ResourceCountName string + ResourceMemoryName string + ResourceCoreName string +} + +type ContainerDevice struct { + // TODO current Idx cannot use, because EncodeContainerDevices method not encode this filed. + Idx int + UUID string + Type string + Usedmem int32 + Usedcores int32 + CustomInfo map[string]any +} + +type ContainerDeviceRequest struct { + Nums int32 + Type string + Memreq int32 + MemPercentagereq int32 + Coresreq int32 +} + +const ( + // OneContainerMultiDeviceSplitSymbol this is when one container use multi device, use : symbol to join device info. + OneContainerMultiDeviceSplitSymbol = ":" + + // OnePodMultiContainerSplitSymbol this is when one pod having multi container and more than one container use device, use ; symbol to join device info. + OnePodMultiContainerSplitSymbol = ";" +) + +var ( + HandshakeAnnos = map[string]string{} + InRequestDevices = map[string]string{} + SupportDevices = map[string]string{} +) + +type ContainerDevices []ContainerDevice +type ContainerDeviceRequests map[string]ContainerDeviceRequest +type PodDeviceRequests []ContainerDeviceRequests +type PodSingleDevice []ContainerDevices +type PodDevices map[string]PodSingleDevice + +func CheckHealth(devType string, n *corev1.Node) (bool, bool) { + handshake := n.Annotations[HandshakeAnnos[devType]] + if strings.Contains(handshake, "Requesting") { + formertime, _ := time.Parse(time.DateTime, strings.Split(handshake, "_")[1]) + return time.Now().Before(formertime.Add(time.Second * 60)), false + } else if strings.Contains(handshake, "Deleted") { + return true, false + } else { + return true, true + } +} + +func UnMarshalNodeDevices(str string) ([]*DeviceInfo, error) { + var dlist []*DeviceInfo + err := json.Unmarshal([]byte(str), &dlist) + return dlist, err +} + +func EncodeContainerDevices(cd ContainerDevices) string { + tmp := "" + for _, val := range cd { + tmp += val.UUID + "," + val.Type + "," + strconv.Itoa(int(val.Usedmem)) + "," + strconv.Itoa(int(val.Usedcores)) + OneContainerMultiDeviceSplitSymbol + } + klog.Infof("Encoded container Devices: %s", tmp) + return tmp + //return strings.Join(cd, ",") +} + +func EncodePodSingleDevice(pd PodSingleDevice) string { + res := "" + for _, ctrdevs := range pd { + res = res + EncodeContainerDevices(ctrdevs) + res = res + OnePodMultiContainerSplitSymbol + } + klog.Infof("Encoded pod single devices %s", res) + return res +} + +func DecodeContainerDevices(str string) (ContainerDevices, error) { + if len(str) == 0 { + return ContainerDevices{}, nil + } + cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol) + contdev := ContainerDevices{} + tmpdev := ContainerDevice{} + klog.V(5).Infof("Start to decode container device %s", str) + for _, val := range cd { + if strings.Contains(val, ",") { + //fmt.Println("cd is ", val) + tmpstr := strings.Split(val, ",") + if len(tmpstr) < 4 { + return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task") + } + tmpdev.UUID = tmpstr[0] + tmpdev.Type = tmpstr[1] + devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32) + tmpdev.Usedmem = int32(devmem) + devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32) + tmpdev.Usedcores = int32(devcores) + contdev = append(contdev, tmpdev) + } + } + klog.V(5).Infof("Finished decoding container devices. Total devices: %d", len(contdev)) + return contdev, nil +} + +func DecodePodDevices(checklist map[string]string, annos map[string]string) (PodDevices, error) { + klog.V(5).Infof("checklist is [%+v], annos is [%+v]", checklist, annos) + if len(annos) == 0 { + return PodDevices{}, nil + } + pd := make(PodDevices) + for devID, devs := range checklist { + str, ok := annos[devs] + if !ok { + continue + } + pd[devID] = make(PodSingleDevice, 0) + for s := range strings.SplitSeq(str, OnePodMultiContainerSplitSymbol) { + cd, err := DecodeContainerDevices(s) + if err != nil { + return PodDevices{}, nil + } + if len(cd) == 0 { + continue + } + pd[devID] = append(pd[devID], cd) + } + } + klog.V(5).InfoS("Decoded pod annos", "poddevices", pd) + return pd, nil +} diff --git a/pkg/scheduler/api/devices/util.go b/pkg/scheduler/api/devices/util.go index e23829f663..e04499e512 100644 --- a/pkg/scheduler/api/devices/util.go +++ b/pkg/scheduler/api/devices/util.go @@ -31,6 +31,15 @@ limitations under the License. package devices import ( + "context" + "encoding/json" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -59,6 +68,16 @@ const ( Skip ) +const ( + AssignedNodeAnnotations = "hami.io/vgpu-node" + AssignedTimeAnnotations = "hami.io/vgpu-time" + BindTimeAnnotations = "hami.io/bind-time" + DeviceBindPhase = "hami.io/bind-phase" + DeviceBindAllocating = "allocating" + DeviceBindFailed = "failed" + DeviceBindSuccess = "success" +) + var kubeClient *kubernetes.Clientset func GetClient() kubernetes.Interface { @@ -81,3 +100,90 @@ func NewClient() (*kubernetes.Clientset, error) { client, err := kubernetes.NewForConfig(config) return client, err } + +func GetNode(nodename string) (*v1.Node, error) { + if nodename == "" { + klog.ErrorS(nil, "Node name is empty") + return nil, fmt.Errorf("nodename is empty") + } + + klog.V(5).InfoS("Fetching node", "nodeName", nodename) + n, err := GetClient().CoreV1().Nodes().Get(context.Background(), nodename, metav1.GetOptions{}) + if err != nil { + switch { + case apierrors.IsNotFound(err): + klog.ErrorS(err, "Node not found", "nodeName", nodename) + return nil, fmt.Errorf("node %s not found", nodename) + case apierrors.IsUnauthorized(err): + klog.ErrorS(err, "Unauthorized to access node", "nodeName", nodename) + return nil, fmt.Errorf("unauthorized to access node %s", nodename) + default: + klog.ErrorS(err, "Failed to get node", "nodeName", nodename) + return nil, fmt.Errorf("failed to get node %s: %v", nodename, err) + } + } + + klog.V(5).InfoS("Successfully fetched node", "nodeName", nodename) + return n, nil +} + +func MarkAnnotationsToDelete(devType string, nn string) error { + tmppat := make(map[string]string) + tmppat[devType] = "Deleted_" + time.Now().Format(time.DateTime) + n, err := GetNode(nn) + if err != nil { + klog.Errorln("get node failed", err.Error()) + return err + } + return PatchNodeAnnotations(n, tmppat) +} + +func PatchPodAnnotations(kubeClient kubernetes.Interface, pod *v1.Pod, annotations map[string]string) error { + type patchMetadata struct { + Annotations map[string]string `json:"annotations,omitempty"` + } + type patchPod struct { + Metadata patchMetadata `json:"metadata"` + //Spec patchSpec `json:"spec,omitempty"` + } + + p := patchPod{} + p.Metadata.Annotations = annotations + + bytes, err := json.Marshal(p) + if err != nil { + return err + } + _, err = kubeClient.CoreV1().Pods(pod.Namespace). + Patch(context.Background(), pod.Name, k8stypes.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + klog.Errorf("patch pod %v failed, %v", pod.Name, err) + } + + return err +} + +func PatchNodeAnnotations(node *v1.Node, annotations map[string]string) error { + type patchMetadata struct { + Annotations map[string]string `json:"annotations,omitempty"` + } + type patchPod struct { + Metadata patchMetadata `json:"metadata"` + //Spec patchSpec `json:"spec,omitempty"` + } + + p := patchPod{} + p.Metadata.Annotations = annotations + + bytes, err := json.Marshal(p) + if err != nil { + return err + } + _, err = GetClient().CoreV1().Nodes(). + Patch(context.Background(), node.Name, k8stypes.StrategicMergePatchType, bytes, metav1.PatchOptions{}) + if err != nil { + klog.Infoln("annotations=", annotations) + klog.Infof("patch node %v failed, %v", node.Name, err) + } + return err +} diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index 12306aeae7..3f9e3ccaf2 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -350,10 +351,16 @@ func (ni *NodeInfo) setNodeOthersResource(node *v1.Node) { ni.Others[gpushare.DeviceName] = gpushare.NewGPUDevices(ni.Name, node) ni.Others[vgpu.DeviceName] = vgpu.NewGPUDevices(ni.Name, node) ni.Others[vnpu.DeviceName] = vnpu.NewNPUDevices(ni.Name, node) + ascend_ignored_list := []string{} + for device_name, devices := range ascend.NewAscendDevices(ni.Name, node) { + ni.Others[device_name] = devices + ascend_ignored_list = append(ascend_ignored_list, devices.GetIgnoredDevices()...) + } IgnoredDevicesList.Set( ni.Others[gpushare.DeviceName].(Devices).GetIgnoredDevices(), ni.Others[vgpu.DeviceName].(Devices).GetIgnoredDevices(), ni.Others[vnpu.DeviceName].(Devices).GetIgnoredDevices(), + ascend_ignored_list, ) } @@ -507,6 +514,13 @@ func (ni *NodeInfo) addResource(pod *v1.Pod) { } ni.Others[vgpu.DeviceName].(Devices).AddResource(pod) ni.Others[vnpu.DeviceName].(Devices).AddResource(pod) + for _, name := range ascend.GetAscendDeviceNames() { + if other, exists := ni.Others[name]; exists { + if devices, ok := other.(Devices); ok { + devices.AddResource(pod) + } + } + } } // subResource is used to subtract sharable devices @@ -516,6 +530,13 @@ func (ni *NodeInfo) subResource(pod *v1.Pod) { } ni.Others[vgpu.DeviceName].(Devices).SubResource(pod) ni.Others[vnpu.DeviceName].(Devices).SubResource(pod) + for _, name := range ascend.GetAscendDeviceNames() { + if other, exists := ni.Others[name]; exists { + if devices, ok := other.(Devices); ok { + devices.SubResource(pod) + } + } + } } // UpdateTask is used to update a task in nodeInfo object. diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index c6e524140a..b96196098f 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/kubernetes" "volcano.sh/volcano/pkg/scheduler/api/devices/ascend/ascend310p/vnpu" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/vgpu" ) @@ -78,6 +79,7 @@ type Devices interface { var _ Devices = new(gpushare.GPUDevices) var _ Devices = new(vgpu.GPUDevices) var _ Devices = new(vnpu.NPUDevices) +var _ Devices = new(ascend.AscendDevices) var RegisteredDevices = []string{ gpushare.DeviceName, @@ -85,6 +87,10 @@ var RegisteredDevices = []string{ vnpu.DeviceName, } +func RegisterDevice(deviceName string) { + RegisteredDevices = append(RegisteredDevices, deviceName) +} + var IgnoredDevicesList = ignoredDevicesList{} type ignoredDevicesList struct { diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index fc59e36d41..d574c4ab48 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "reflect" + "sync" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -28,6 +29,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/devices" + "volcano.sh/volcano/pkg/scheduler/api/devices/ascend" "volcano.sh/volcano/pkg/scheduler/api/devices/ascend/ascend310p/vnpu" "volcano.sh/volcano/pkg/scheduler/api/devices/config" "volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare" @@ -46,7 +48,8 @@ const ( VGPUEnable = "deviceshare.VGPUEnable" - ASCEND310PvGPU = "deviceshare.ASCEND310PVNPUEnable" + ASCEND310PvGPU = "deviceshare.ASCEND310PVGPUEnable" + AscendVNPUEnable = "deviceshare.AscendVNPUEnable" SchedulePolicyArgument = "deviceshare.SchedulePolicy" ScheduleWeight = "deviceshare.ScheduleWeight" @@ -55,6 +58,10 @@ const ( KnownGeometriesCMNamespace = "deviceshare.KnownGeometriesCMNamespace" ) +var ( + once sync.Once +) + type deviceSharePlugin struct { // Arguments given for the plugin pluginArguments framework.Arguments @@ -82,6 +89,7 @@ func enablePredicate(dsp *deviceSharePlugin) { args.GetBool(&nodeLockEnable, NodeLockEnable) args.GetBool(&vgpu.VGPUEnable, VGPUEnable) args.GetBool(&vnpu.Ascend310pvNPUEnable, ASCEND310PvGPU) + args.GetBool(&ascend.AscendVNPUEnable, AscendVNPUEnable) gpushare.NodeLockEnable = nodeLockEnable vgpu.NodeLockEnable = nodeLockEnable @@ -96,14 +104,23 @@ func enablePredicate(dsp *deviceSharePlugin) { klog.Fatal("gpu-share and vgpu can't be used together") } - if !vgpu.VGPUEnable { - return - } knownGeometriesCMName := "volcano-vgpu-device-config" args.GetString(&knownGeometriesCMName, KnownGeometriesCMName) knownGeometriesCMNamespace := "kube-system" args.GetString(&knownGeometriesCMNamespace, KnownGeometriesCMNamespace) config.InitDevicesConfig(knownGeometriesCMName, knownGeometriesCMNamespace) + registerDevices() +} + +func registerDevices() { + once.Do(func() { + if ascend.AscendVNPUEnable { + for _, vnpu := range config.GetConfig().VNPUs { + klog.Infof("register device %s", vnpu.CommonWord) + api.RegisterDevice(vnpu.CommonWord) + } + } + }) } func createStatus(code int, reason string) *api.Status { @@ -118,16 +135,12 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu s := float64(0) for deviceType, device := range node.Others { if device.(api.Devices).HasDeviceRequest(pod) { - var ns float64 // Only process device types that use NodeOrderFn (vgpu and gpushare) // vnpu devices use BatchNodeOrderFn, skip them here - if deviceType == vgpu.DeviceName || deviceType == gpushare.DeviceName { - ns = device.(api.Devices).ScoreNode(pod, schedulePolicy) - } else { - // Other device types (like vnpu) use BatchNodeOrderFn, skip scoring here - continue + if deviceType != vnpu.NPUDevices { + ns := device.(api.Devices).ScoreNode(pod, schedulePolicy) + s += ns } - s += ns } } klog.V(4).Infof("deviceScore for task %s/%s is: %v", pod.Namespace, pod.Name, s) @@ -172,11 +185,14 @@ func initializeDevicesWithSession(ssn *framework.Session) { func initializeDevice(device api.Devices, ssn *framework.Session, nodeInfo *api.NodeInfo) error { switch d := device.(type) { case *vnpu.NPUDevices: - klog.V(3).Infof("initialize ascend310p device.") - return vnpu310p.InitVNPUDevice(d, ssn, nodeInfo) + if vnpu.Ascend310pvNPUEnable { + klog.V(3).Infof("initialize ascend310p device.") + return vnpu310p.InitVNPUDevice(d, ssn, nodeInfo) + } default: return nil } + return nil } func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { @@ -191,7 +207,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { if dev, ok := node.Others[val].(api.Devices); ok { if reflect.ValueOf(dev).IsNil() { // TODO When a pod requests a device of the current type, but the current node does not have such a device, an error is thrown - if dev == nil || dev.HasDeviceRequest(task.Pod) { + if dev == nil { predicateStatus = append(predicateStatus, &api.Status{ Code: devices.Unschedulable, Reason: "node not initialized with device" + val, @@ -202,8 +218,13 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) continue } + if !dev.HasDeviceRequest(task.Pod) { + klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) + continue + } code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy) if err != nil { + klog.V(4).Infof("pod %s/%s fit failed. device %s node %s err %v", task.Pod.Namespace, task.Pod.Name, val, node.Name, err) predicateStatus = append(predicateStatus, createStatus(code, msg)) return api.NewFitErrWithStatus(task, node, predicateStatus...) }