diff --git a/logstash-core/lib/logstash/instrument/periodic_poller/cgroup.rb b/logstash-core/lib/logstash/instrument/periodic_poller/cgroup.rb index 5213f83ccfb..c6439eab22b 100644 --- a/logstash-core/lib/logstash/instrument/periodic_poller/cgroup.rb +++ b/logstash-core/lib/logstash/instrument/periodic_poller/cgroup.rb @@ -52,7 +52,10 @@ def override(other) CGROUP_FILE = "/proc/self/cgroup" CPUACCT_DIR = "/sys/fs/cgroup/cpuacct" CPU_DIR = "/sys/fs/cgroup/cpu" - CRITICAL_PATHS = [CGROUP_FILE, CPUACCT_DIR, CPU_DIR] + CGROUP2_DIR = "/sys/fs/cgroup" + CRITICAL_PATHS_V1 = [CGROUP_FILE, CPUACCT_DIR, CPU_DIR] + CRITICAL_PATHS = CRITICAL_PATHS_V1 # backward compat for logging + CONTROL_GROUP_V2_RE = Regexp.compile("^0::(/.*)") CONTROLLER_CPUACCT_LABEL = "cpuacct" CONTROLLER_CPU_LABEL = "cpu" @@ -63,12 +66,28 @@ class CGroupResources def cgroup_available? # don't cache to ivar, in case the files are mounted after logstash starts?? - CRITICAL_PATHS.all? {|path| ::File.exist?(path)} + cgroup_v1_available? || cgroup_v2_available? + end + + def cgroup_v1_available? + CRITICAL_PATHS_V1.all? { |path| ::File.exist?(path) } + end + + def cgroup_v2_available? + ::File.exist?(CGROUP_FILE) && + ::File.exist?(::File.join(CGROUP2_DIR, "cgroup.controllers")) end def controller_groups response = {} + v2_path = nil IO.readlines(CGROUP_FILE).each do |line| + # capture v2 unified hierarchy path (0::/path) + v2_match = CONTROL_GROUP_V2_RE.match(line) + if v2_match + v2_path = v2_match[1] + end + matches = CONTROL_GROUP_RE.match(line) next if matches.nil? # multiples controls, same hierarchy @@ -84,6 +103,15 @@ def controller_groups end end end + + # If cpu/cpuacct not found via v1, try v2 unified hierarchy + if v2_path && !response.key?(CONTROLLER_CPU_LABEL) && !response.key?(CONTROLLER_CPUACCT_LABEL) + if cgroup_v2_available? + response[CONTROLLER_CPU_LABEL] = CpuResourceV2.new(v2_path) + response[CONTROLLER_CPUACCT_LABEL] = CpuAcctResourceV2.new(v2_path) + end + end + response end end @@ -210,6 +238,93 @@ def to_hash end end + class CpuAcctResourceV2 + include LogStash::Util::Loggable + include ControllerResource + + def initialize(original_path) + common_initialize(CGROUP2_DIR, "ls.cgroup.cpuacct.path.override", original_path) + end + + def to_hash + {:control_group => offset_path, :usage_nanos => cpu_usage_nanos} + end + + private + def cpu_usage_nanos + lines = call_if_file_exists(:read_lines, "cpu.stat", []) + lines.each do |line| + fields = line.split(/\s+/) + return fields[1].to_i * 1000 if fields.first == "usage_usec" + end + -1 + end + end + + class CpuResourceV2 + include LogStash::Util::Loggable + include ControllerResource + + def initialize(original_path) + common_initialize(CGROUP2_DIR, "ls.cgroup.cpu.path.override", original_path) + end + + def to_hash + quota, period = read_cpu_max + { + :control_group => offset_path, + :cfs_period_micros => period, + :cfs_quota_micros => quota, + :stat => build_cpu_stats_hash + } + end + + private + def read_cpu_max + content = call_if_file_exists(:read_lines, "cpu.max", []) + return [-1, -1] if content.empty? + parts = content.first.split(/\s+/) + quota = parts[0] == "max" ? -1 : parts[0].to_i + period = parts[1].to_i + [quota, period] + end + + def build_cpu_stats_hash + stats = CpuStatsV2.new + lines = call_if_file_exists(:read_lines, "cpu.stat", []) + stats.update(lines) + stats.to_hash + end + end + + class CpuStatsV2 + def initialize + @number_of_elapsed_periods = -1 + @number_of_times_throttled = -1 + @time_throttled_nanos = -1 + end + + def update(lines) + lines.each do |line| + fields = line.split(/\s+/) + next unless fields.size > 1 + case fields.first + when "nr_periods" then @number_of_elapsed_periods = fields[1].to_i + when "nr_throttled" then @number_of_times_throttled = fields[1].to_i + when "throttled_usec" then @time_throttled_nanos = fields[1].to_i * 1000 + end + end + end + + def to_hash + { + :number_of_elapsed_periods => @number_of_elapsed_periods, + :number_of_times_throttled => @number_of_times_throttled, + :time_throttled_nanos => @time_throttled_nanos + } + end + end + CGROUP_RESOURCES = CGroupResources.new class << self diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/cgroup_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/cgroup_spec.rb index 2873b3a9b29..10696c0798f 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/cgroup_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/cgroup_spec.rb @@ -291,5 +291,295 @@ module LogStash module Instrument module PeriodicPoller end end end + + # --- cgroups v2 tests --- + + describe Cgroup::CGroupResources do + subject(:cgroup_resources) { described_class.new } + + context "cgroup v2 detection" do + context "cgroup_available? returns true when only v2 paths exist" do + before do + allow(::File).to receive(:exist?).and_return(false) + allow(::File).to receive(:exist?).with("/proc/self/cgroup").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(true) + end + + it "returns true" do + expect(cgroup_resources.cgroup_available?).to be_truthy + end + end + + context "cgroup_available? returns false when neither v1 nor v2 paths exist" do + before do + allow(::File).to receive(:exist?).and_return(false) + allow(::File).to receive(:exist?).with("/proc/self/cgroup").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(false) + end + + it "returns false" do + expect(cgroup_resources.cgroup_available?).to be_falsey + end + end + + context "controller_groups returns v2 resources for v2-only /proc/self/cgroup" do + let(:v2_relative_path) { "/system.slice/docker-abc123.scope" } + let(:proc_self_cgroup_v2) { ["0::#{v2_relative_path}"] } + + before do + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(proc_self_cgroup_v2) + allow(::File).to receive(:exist?).with("/proc/self/cgroup").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(true) + end + + it "returns v2 cpu and cpuacct resources" do + controllers = cgroup_resources.controller_groups + + controller = controllers["cpuacct"] + expect(controller).to be_a(Cgroup::CpuAcctResourceV2) + expect(controller.base_path).to eq("/sys/fs/cgroup") + expect(controller.offset_path).to eq(v2_relative_path) + + controller = controllers["cpu"] + expect(controller).to be_a(Cgroup::CpuResourceV2) + expect(controller.base_path).to eq("/sys/fs/cgroup") + expect(controller.offset_path).to eq(v2_relative_path) + end + end + + context "controller_groups returns v1 resources in hybrid mode (v1 takes priority)" do + let(:hybrid_cgroup_content) do + %W(4:cpuacct:#{relative_path} + 3:cpu:#{relative_path} + 0::/docker) + end + + before do + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(hybrid_cgroup_content) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(true) + end + + it "returns v1 resources, not v2" do + controllers = cgroup_resources.controller_groups + + expect(controllers["cpuacct"]).to be_a(Cgroup::CpuAcctResource) + expect(controllers["cpu"]).to be_a(Cgroup::CpuResource) + end + end + end + end + + describe Cgroup::CpuAcctResourceV2 do + subject(:cpuacct_resource_v2) { described_class.new("/bar") } + + describe "method: to_hash, without override" do + context "when cpu.stat contains usage_usec" do + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/bar/cpu.stat").and_return( + ["usage_usec 378477588", "user_usec 340000000", "system_usec 38477588"] + ) + end + + it "returns usage_nanos converted from usage_usec (* 1000)" do + result = cpuacct_resource_v2.to_hash + expect(result[:control_group]).to eq("/bar") + expect(result[:usage_nanos]).to eq(378477588 * 1000) + end + end + + context "when cpu.stat cannot be found" do + it "returns -1 for usage_nanos" do + expect(cpuacct_resource_v2.base_path).to eq("/sys/fs/cgroup") + expect(cpuacct_resource_v2.offset_path).to eq("/bar") + expect(cpuacct_resource_v2.to_hash).to eq({:control_group => "/bar", :usage_nanos => -1}) + end + end + end + + describe "method: to_hash, with override" do + before do + java.lang.System.setProperty("ls.cgroup.cpuacct.path.override", "/quux") + end + after do + java.lang.System.clearProperty("ls.cgroup.cpuacct.path.override") + end + context "when the files cannot be found" do + it "uses the overridden path" do + expect(cpuacct_resource_v2.base_path).to eq("/sys/fs/cgroup") + expect(cpuacct_resource_v2.offset_path).to eq("/quux") + expect(cpuacct_resource_v2.to_hash).to eq({:control_group => "/quux", :usage_nanos => -1}) + end + end + end + end + + describe Cgroup::CpuResourceV2 do + subject(:cpu_resource_v2) { described_class.new("/bar") } + + describe "method: to_hash, without override" do + context "when cpu.max has a numeric quota" do + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/bar/cpu.max").and_return(["150000 100000"]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/bar/cpu.stat").and_return( + ["usage_usec 378477588", "nr_periods 4157", "nr_throttled 460", "throttled_usec 581617440"] + ) + end + + it "parses cpu.max correctly" do + result = cpu_resource_v2.to_hash + expect(result[:control_group]).to eq("/bar") + expect(result[:cfs_quota_micros]).to eq(150000) + expect(result[:cfs_period_micros]).to eq(100000) + expect(result[:stat][:number_of_elapsed_periods]).to eq(4157) + expect(result[:stat][:number_of_times_throttled]).to eq(460) + expect(result[:stat][:time_throttled_nanos]).to eq(581617440 * 1000) + end + end + + context "when cpu.max has 'max' (unlimited) quota" do + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/bar/cpu.max").and_return(["max 100000"]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/bar/cpu.stat").and_return([]) + end + + it "returns -1 for quota" do + result = cpu_resource_v2.to_hash + expect(result[:cfs_quota_micros]).to eq(-1) + expect(result[:cfs_period_micros]).to eq(100000) + end + end + + context "when files cannot be found" do + it "returns -1s" do + expect(cpu_resource_v2.base_path).to eq("/sys/fs/cgroup") + expect(cpu_resource_v2.offset_path).to eq("/bar") + expect(cpu_resource_v2.to_hash).to eq({ + :cfs_period_micros => -1, + :cfs_quota_micros => -1, + :control_group => "/bar", + :stat => { + :number_of_elapsed_periods => -1, + :number_of_times_throttled => -1, + :time_throttled_nanos => -1 + } + }) + end + end + end + + describe "method: to_hash, with override" do + before do + java.lang.System.setProperty("ls.cgroup.cpu.path.override", "/quux") + end + after do + java.lang.System.clearProperty("ls.cgroup.cpu.path.override") + end + context "when the files cannot be found" do + it "uses the overridden path" do + expect(cpu_resource_v2.base_path).to eq("/sys/fs/cgroup") + expect(cpu_resource_v2.offset_path).to eq("/quux") + expect(cpu_resource_v2.to_hash).to eq({ + :cfs_period_micros => -1, + :cfs_quota_micros => -1, + :control_group => "/quux", + :stat => { + :number_of_elapsed_periods => -1, + :number_of_times_throttled => -1, + :time_throttled_nanos => -1 + } + }) + end + end + end + end + + describe Cgroup do + describe "class method: get_all (v2)" do + let(:v2_relative_path) { "/system.slice/docker-abc123.scope" } + let(:proc_self_cgroup_v2) { ["0::#{v2_relative_path}"] } + let(:usage_usec) { 378477588 } + let(:cpu_max_content) { ["150000 100000"] } + let(:cpu_stats_number_of_periods) { 4157 } + let(:cpu_stats_number_of_time_throttled) { 460 } + let(:cpu_stats_throttled_usec) { 581617440 } + let(:cpu_stat_file_content) do + [ + "usage_usec #{usage_usec}", + "nr_periods #{cpu_stats_number_of_periods}", + "nr_throttled #{cpu_stats_number_of_time_throttled}", + "throttled_usec #{cpu_stats_throttled_usec}" + ] + end + + before do + allow(::File).to receive(:exist?).and_return(false) + allow(::File).to receive(:exist?).with("/proc/self/cgroup").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(true) + # v2 file paths + allow(::File).to receive(:exist?).with("/sys/fs/cgroup#{v2_relative_path}/cpu.stat").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup#{v2_relative_path}/cpu.max").and_return(true) + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(proc_self_cgroup_v2) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup#{v2_relative_path}/cpu.stat").and_return(cpu_stat_file_content) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup#{v2_relative_path}/cpu.max").and_return(cpu_max_content) + end + + it "returns all the stats with the same hash structure as v1" do + expect(described_class.get_all).to match( + :cpuacct => { + :control_group => v2_relative_path, + :usage_nanos => usage_usec * 1000, + }, + :cpu => { + :control_group => v2_relative_path, + :cfs_period_micros => 100000, + :cfs_quota_micros => 150000, + :stat => { + :number_of_elapsed_periods => cpu_stats_number_of_periods, + :number_of_times_throttled => cpu_stats_number_of_time_throttled, + :time_throttled_nanos => cpu_stats_throttled_usec * 1000 + } + } + ) + end + end + + describe "class method: get_all (hybrid mode, v1 takes priority)" do + let(:hybrid_cgroup_content) do + %W(4:cpuacct:#{relative_path} + 3:cpu:#{relative_path} + 0::/docker) + end + let(:cpuacct_usage) { 1982 } + let(:cfs_period_micros) { 500 } + let(:cfs_quota_micros) { 98 } + let(:cpu_stats_number_of_periods) { 1 } + let(:cpu_stats_number_of_time_throttled) { 2 } + let(:cpu_stats_time_throttled_nanos) { 3 } + let(:cpu_stat_file_content) do + ["nr_periods #{cpu_stats_number_of_periods}", "nr_throttled #{cpu_stats_number_of_time_throttled}", "throttled_time #{cpu_stats_time_throttled_nanos}"] + end + + before do + allow(::File).to receive(:exist?).and_return(true) + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(hybrid_cgroup_content) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpuacct#{relative_path}/cpuacct.usage").and_return([cpuacct_usage]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_period_us").and_return([cfs_period_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.cfs_quota_us").and_return([cfs_quota_micros]) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup/cpu#{relative_path}/cpu.stat").and_return(cpu_stat_file_content) + end + + it "uses v1 resources, not v2" do + result = described_class.get_all + expect(result[:cpuacct][:control_group]).to eq(relative_path) + expect(result[:cpuacct][:usage_nanos]).to eq(cpuacct_usage) + expect(result[:cpu][:control_group]).to eq(relative_path) + expect(result[:cpu][:cfs_period_micros]).to eq(cfs_period_micros) + expect(result[:cpu][:cfs_quota_micros]).to eq(cfs_quota_micros) + end + end + end end end end end diff --git a/logstash-core/spec/logstash/instrument/periodic_poller/os_spec.rb b/logstash-core/spec/logstash/instrument/periodic_poller/os_spec.rb index 2edd9bc1930..55f12c0482a 100644 --- a/logstash-core/spec/logstash/instrument/periodic_poller/os_spec.rb +++ b/logstash-core/spec/logstash/instrument/periodic_poller/os_spec.rb @@ -94,4 +94,73 @@ def mval(*metric_path) end end end + + context "recorded cgroup v2 metrics (mocked cgroup env)" do + subject { described_class.new(metric, {})} + + let(:snapshot_store) { metric.collector.snapshot_metric.metric_store } + let(:os_metrics) { snapshot_store.get_shallow(:os) } + + let(:v2_relative_path) { "/system.slice/docker-abc123def456.scope" } + let(:proc_self_cgroup_v2) { ["0::#{v2_relative_path}"] } + + let(:usage_usec) { 378477588 } + let(:cpu_period_micros) { 100000 } + let(:cpu_quota_micros) { 150000 } + let(:cpu_stats_number_of_periods) { 4157 } + let(:cpu_stats_number_of_time_throttled) { 460 } + let(:cpu_stats_throttled_usec) { 581617440 } + + let(:cpu_stat_file_content) do + [ + "usage_usec #{usage_usec}", + "user_usec 340000000", + "system_usec 38477588", + "nr_periods #{cpu_stats_number_of_periods}", + "nr_throttled #{cpu_stats_number_of_time_throttled}", + "throttled_usec #{cpu_stats_throttled_usec}" + ] + end + + let(:cpu_max_content) { ["#{cpu_quota_micros} #{cpu_period_micros}"] } + + before do + allow(::File).to receive(:exist?).and_return(false) + allow(::File).to receive(:exist?).with("/proc/self/cgroup").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup/cgroup.controllers").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup#{v2_relative_path}/cpu.stat").and_return(true) + allow(::File).to receive(:exist?).with("/sys/fs/cgroup#{v2_relative_path}/cpu.max").and_return(true) + allow(IO).to receive(:readlines).with("/proc/self/cgroup").and_return(proc_self_cgroup_v2) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup#{v2_relative_path}/cpu.stat").and_return(cpu_stat_file_content) + allow(IO).to receive(:readlines).with("/sys/fs/cgroup#{v2_relative_path}/cpu.max").and_return(cpu_max_content) + + subject.collect + end + + def mval(*metric_path) + metric_path.reduce(os_metrics) {|acc, k| acc[k]}.value + end + + it "should have a value for #{[:cgroup, :cpuacct, :control_group]} that is a String" do + expect(mval(:cgroup, :cpuacct, :control_group)).to be_a(String) + end + + it "should have a value for #{[:cgroup, :cpu, :control_group]} that is a String" do + expect(mval(:cgroup, :cpu, :control_group)).to be_a(String) + end + + [ + [:cgroup, :cpuacct, :usage_nanos], + [:cgroup, :cpu, :cfs_period_micros], + [:cgroup, :cpu, :cfs_quota_micros], + [:cgroup, :cpu, :stat, :number_of_elapsed_periods], + [:cgroup, :cpu, :stat, :number_of_times_throttled], + [:cgroup, :cpu, :stat, :time_throttled_nanos] + ].each do |path| + path = Array(path) + it "should have a value for #{path} that is Numeric" do + expect(mval(*path)).to be_a(Numeric) + end + end + end end