diff --git a/tools/upmap/test_upmap-remapped.py b/tools/upmap/test_upmap-remapped.py new file mode 100644 index 0000000..0d08888 --- /dev/null +++ b/tools/upmap/test_upmap-remapped.py @@ -0,0 +1,46 @@ +# Use importlib since we can't import a module that contains a hyphen +import importlib +upmap_remapped = importlib.import_module('upmap-remapped') + +def test_gen_upmap(monkeypatch): + OSDS = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] + DF = [ + {"id": 0, "crush_weight": 14, "reweight": 1}, + {"id": 1, "crush_weight": 8, "reweight": 1}, + {"id": 2, "crush_weight": 12, "reweight": 1}, + {"id": 3, "crush_weight": 8, "reweight": 1}, + {"id": 4, "crush_weight": 12, "reweight": 1}, + {"id": 5, "crush_weight": 10, "reweight": 1}, + {"id": 6, "crush_weight": 12, "reweight": 1}, + {"id": 7, "crush_weight": 8, "reweight": 1}, + {"id": 8, "crush_weight": 12, "reweight": 1}, + {"id": 9, "crush_weight": 8, "reweight": 1}, + {"id": 10, "crush_weight": 10, "reweight": 1}, + {"id": 11, "crush_weight": 10, "reweight": 1}, + {"id": 12, "crush_weight": 10, "reweight": 1}, + {"id": 13, "crush_weight": 10, "reweight": 1}, + {"id": 14, "crush_weight": 10, "reweight": 1}, + {"id": 15, "crush_weight": 10, "reweight": 1} + ] + + # Use monkeypatch to set the required globals + monkeypatch.setattr(upmap_remapped, "OSDS", OSDS) + monkeypatch.setattr(upmap_remapped, "DF", DF) + + # EC 4+2 - single + assert upmap_remapped.gen_upmap([2,11,5,9,15,12], [2,11,5,9,14,12]) == [(15,14)] + + # EC 4+2 - every osd remapped with no dependencies + assert upmap_remapped.gen_upmap([4,14,10,3,7,8], [9,13,2,15,5,11]) == [(4,9),(14,13),(10,2),(3,15),(7,5),(8,11)] + + # EC 4+2 - one dependency + assert upmap_remapped.gen_upmap([6,12,10,0,2,9], [6,12,9,0,2,4]) == [(9,4),(10,9)] + + # EC 4+2 - two dependencies + assert upmap_remapped.gen_upmap([3,4,12,6,11,0], [6,14,12,4,11,0]) == [(4,14),(6,4),(3,6)] + + # EC 4+2 - three dependencies plus one without a dependency + assert upmap_remapped.gen_upmap([7,1,12,2,15,9], [7,0,4,12,9,2]) == [(1,0),(12,4),(2,12),(9,2),(15,9)] + + # EC 4+2 - upmap not possible since osds are swapped + assert upmap_remapped.gen_upmap([9,4,7,10,14,2], [9,7,4,10,14,2]) == [] diff --git a/tools/upmap/upmap-remapped.py b/tools/upmap/upmap-remapped.py index 002e5b8..fae3a56 100755 --- a/tools/upmap/upmap-remapped.py +++ b/tools/upmap/upmap-remapped.py @@ -40,42 +40,147 @@ import json, subprocess, sys -try: - import rados - cluster = rados.Rados(conffile='/etc/ceph/ceph.conf') - cluster.connect() -except: - use_shell = True -else: - use_shell = False +OSDS = [] +DF = [] -def eprint(*args, **kwargs): - print(*args, file=sys.stderr, **kwargs) +def main(): + global OSDS + global DF -try: - if use_shell: - OSDS = json.loads(subprocess.getoutput('ceph osd ls -f json | jq -r .')) - DF = json.loads(subprocess.getoutput('ceph osd df -f json | jq -r .nodes')) + try: + import rados + cluster = rados.Rados(conffile='/etc/ceph/ceph.conf') + cluster.connect() + except: + use_shell = True else: - cmd = {"prefix": "osd ls", "format": "json"} - ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) - output = output.decode('utf-8').strip() - OSDS = json.loads(output) - cmd = {"prefix": "osd df", "format": "json"} - ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) - output = output.decode('utf-8').strip() - DF = json.loads(output)['nodes'] -except ValueError: - eprint('Error loading OSD IDs') - sys.exit(1) - -ignore_backfilling = False -for arg in sys.argv[1:]: - if arg == "--ignore-backfilling": - eprint ("All actively backfilling PGs will be ignored.") - ignore_backfilling = True + use_shell = False + + try: + if use_shell: + OSDS = json.loads(subprocess.getoutput('ceph osd ls -f json | jq -r .')) + DF = json.loads(subprocess.getoutput('ceph osd df -f json | jq -r .nodes')) + else: + cmd = {"prefix": "osd ls", "format": "json"} + ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) + output = output.decode('utf-8').strip() + OSDS = json.loads(output) + cmd = {"prefix": "osd df", "format": "json"} + ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) + output = output.decode('utf-8').strip() + DF = json.loads(output)['nodes'] + except ValueError: + eprint('Error loading OSD IDs') + sys.exit(1) + + ignore_backfilling = False + for arg in sys.argv[1:]: + if arg == "--ignore-backfilling": + eprint ("All actively backfilling PGs will be ignored.") + ignore_backfilling = True + + # discover remapped pgs + try: + if use_shell: + remapped_json = subprocess.getoutput('ceph pg ls remapped -f json | jq -r .') + else: + cmd = {"prefix": "pg ls", "states": ["remapped"], "format": "json"} + ret, output, err = cluster.mon_command(json.dumps(cmd), b'', timeout=5) + remapped_json = output.decode('utf-8').strip() + try: + remapped = json.loads(remapped_json)['pg_stats'] + except KeyError: + eprint("There are no remapped PGs") + sys.exit(0) + except ValueError: + eprint('Error loading remapped pgs') + sys.exit(1) + + # discover existing upmaps + try: + if use_shell: + osd_dump_json = subprocess.getoutput('ceph osd dump -f json | jq -r .') + else: + cmd = {"prefix": "osd dump", "format": "json"} + ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) + osd_dump_json = output.decode('utf-8').strip() + upmaps = json.loads(osd_dump_json)['pg_upmap_items'] + except ValueError: + eprint('Error loading existing upmaps') + sys.exit(1) + + # discover pools replicated or erasure + pool_type = {} + try: + if use_shell: + osd_pool_ls_detail = subprocess.getoutput('ceph osd pool ls detail') + else: + cmd = {"prefix": "osd pool ls", "detail": "detail", "format": "plain"} + ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) + osd_pool_ls_detail = output.decode('utf-8').strip() + for line in osd_pool_ls_detail.split('\n'): + if 'pool' in line: + x = line.split(' ') + pool_type[x[1]] = x[3] + except: + eprint('Error parsing pool types') + sys.exit(1) + + # discover if each pg is already upmapped + has_upmap = {} + for pg in upmaps: + pgid = str(pg['pgid']) + has_upmap[pgid] = True + + # handle each remapped pg + print('while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') + num = 0 + for pg in remapped: + if num == 50: + print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') + num = 0 + + if ignore_backfilling: + if "backfilling" in pg['state']: + continue + + pgid = pg['pgid'] + + try: + if has_upmap[pgid]: + rm_upmap_pg_items(pgid) + num += 1 + continue + except KeyError: + pass + + up = pg['up'] + acting = pg['acting'] + pool = pgid.split('.')[0] + if pool_type[pool] == 'replicated': + try: + pairs = gen_upmap(up, acting, replicated=True) + except: + continue + elif pool_type[pool] == 'erasure': + try: + pairs = gen_upmap(up, acting) + except: + continue + else: + eprint('Unknown pool type for %s' % pool) + sys.exit(1) + upmap_pg_items(pgid, pairs) + num += 1 + + print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') + cluster.shutdown() + +def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) def crush_weight(id): + global DF for o in DF: if o['id'] == id: return o['crush_weight'] * o['reweight'] @@ -131,102 +236,5 @@ def upmap_pg_items(pgid, mapping): def rm_upmap_pg_items(pgid): print('ceph osd rm-pg-upmap-items %s &' % pgid) - -# start here - -# discover remapped pgs -try: - if use_shell: - remapped_json = subprocess.getoutput('ceph pg ls remapped -f json | jq -r .') - else: - cmd = {"prefix": "pg ls", "states": ["remapped"], "format": "json"} - ret, output, err = cluster.mon_command(json.dumps(cmd), b'', timeout=5) - remapped_json = output.decode('utf-8').strip() - try: - remapped = json.loads(remapped_json)['pg_stats'] - except KeyError: - eprint("There are no remapped PGs") - sys.exit(0) -except ValueError: - eprint('Error loading remapped pgs') - sys.exit(1) - -# discover existing upmaps -try: - if use_shell: - osd_dump_json = subprocess.getoutput('ceph osd dump -f json | jq -r .') - else: - cmd = {"prefix": "osd dump", "format": "json"} - ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) - osd_dump_json = output.decode('utf-8').strip() - upmaps = json.loads(osd_dump_json)['pg_upmap_items'] -except ValueError: - eprint('Error loading existing upmaps') - sys.exit(1) - -# discover pools replicated or erasure -pool_type = {} -try: - if use_shell: - osd_pool_ls_detail = subprocess.getoutput('ceph osd pool ls detail') - else: - cmd = {"prefix": "osd pool ls", "detail": "detail", "format": "plain"} - ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5) - osd_pool_ls_detail = output.decode('utf-8').strip() - for line in osd_pool_ls_detail.split('\n'): - if 'pool' in line: - x = line.split(' ') - pool_type[x[1]] = x[3] -except: - eprint('Error parsing pool types') - sys.exit(1) - -# discover if each pg is already upmapped -has_upmap = {} -for pg in upmaps: - pgid = str(pg['pgid']) - has_upmap[pgid] = True - -# handle each remapped pg -print('while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') -num = 0 -for pg in remapped: - if num == 50: - print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') - num = 0 - - if ignore_backfilling: - if "backfilling" in pg['state']: - continue - - pgid = pg['pgid'] - - try: - if has_upmap[pgid]: - rm_upmap_pg_items(pgid) - num += 1 - continue - except KeyError: - pass - - up = pg['up'] - acting = pg['acting'] - pool = pgid.split('.')[0] - if pool_type[pool] == 'replicated': - try: - pairs = gen_upmap(up, acting, replicated=True) - except: - continue - elif pool_type[pool] == 'erasure': - try: - pairs = gen_upmap(up, acting) - except: - continue - else: - eprint('Unknown pool type for %s' % pool) - sys.exit(1) - upmap_pg_items(pgid, pairs) - num += 1 - -print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done') -cluster.shutdown() +if __name__ == "__main__": + main()