[PATCH 22/23] Fully (and explicitly) close PackIdxLists

0 views
Skip to first unread message

Rob Browning

unread,
Nov 13, 2021, 12:59:33 PM11/13/21
to bup-...@googlegroups.com, Bas Stottelaar
And stop checking _mpi_count in __del__ since there are no guarantees
about if/when it will run (and so could run after another has been
opened).

Signed-off-by: Rob Browning <r...@defaultvalue.org>
Tested-by: Rob Browning <r...@defaultvalue.org>
---
lib/bup/client.py | 11 ++--
lib/bup/cmd/margin.py | 70 ++++++++++++------------
lib/bup/cmd/memtest.py | 47 ++++++++--------
lib/bup/cmd/tag.py | 8 +--
lib/bup/git.py | 116 +++++++++++++++++++++++++---------------
lib/bup/helpers.py | 6 ++-
test/int/test_client.py | 47 ++++++++--------
test/int/test_git.py | 74 ++++++++++++-------------
8 files changed, 208 insertions(+), 171 deletions(-)

diff --git a/lib/bup/client.py b/lib/bup/client.py
index 637737bb..ed902b74 100644
--- a/lib/bup/client.py
+++ b/lib/bup/client.py
@@ -10,7 +10,7 @@ from bup import git, ssh, vfs
from bup.compat import environ, pending_raise, range, reraise
from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
debug2, linereader, lines_until_sentinel,
- mkdirp, progress, qprogress, DemuxConn)
+ mkdirp, nullcontext_if_not, progress, qprogress, DemuxConn)
from bup.io import path_msg
from bup.vint import write_bvec

@@ -515,13 +515,16 @@ class PackWriter_Remote(git.PackWriter):
# Called by other PackWriter methods like breakpoint().
# Must not close the connection (self.file)
assert(run_midx) # We don't support this via remote yet
- if self._packopen and self.file:
+ self.objcache, objcache = None, self.objcache
+ with nullcontext_if_not(objcache):
+ if not (self._packopen and self.file):
+ return None
self.file.write(b'\0\0\0\0')
self._packopen = False
self.onclose() # Unbusy
- self.objcache = None
+ if objcache is not None:
+ objcache.close()
return self.suggest_packs() # Returns last idx received
- return None

def close(self):
# Called by inherited __exit__
diff --git a/lib/bup/cmd/margin.py b/lib/bup/cmd/margin.py
index 07f2b0f7..7836c719 100755
--- a/lib/bup/cmd/margin.py
+++ b/lib/bup/cmd/margin.py
@@ -24,44 +24,46 @@ def main(argv):

git.check_repo_or_die()

- mi = git.PackIdxList(git.repo(b'objects/pack'), ignore_midx=opt.ignore_midx)
+ with git.PackIdxList(git.repo(b'objects/pack'),
+ ignore_midx=opt.ignore_midx) as mi:

- def do_predict(ix, out):
- total = len(ix)
- maxdiff = 0
- for count,i in enumerate(ix):
- prefix = struct.unpack('!Q', i[:8])[0]
- expected = prefix * total // (1 << 64)
- diff = count - expected
- maxdiff = max(maxdiff, abs(diff))
- out.write(b'%d of %d (%.3f%%) '
- % (maxdiff, len(ix), maxdiff * 100.0 / len(ix)))
- out.flush()
- assert(count+1 == len(ix))
+ def do_predict(ix, out):
+ total = len(ix)
+ maxdiff = 0
+ for count,i in enumerate(ix):
+ prefix = struct.unpack('!Q', i[:8])[0]
+ expected = prefix * total // (1 << 64)
+ diff = count - expected
+ maxdiff = max(maxdiff, abs(diff))
+ out.write(b'%d of %d (%.3f%%) '
+ % (maxdiff, len(ix), maxdiff * 100.0 / len(ix)))
+ out.flush()
+ assert(count+1 == len(ix))

- sys.stdout.flush()
- out = byte_stream(sys.stdout)
+ sys.stdout.flush()
+ out = byte_stream(sys.stdout)

- if opt.predict:
- if opt.ignore_midx:
- for pack in mi.packs:
- do_predict(pack, out)
+ if opt.predict:
+ if opt.ignore_midx:
+ for pack in mi.packs:
+ do_predict(pack, out)
+ else:
+ do_predict(mi, out)
else:
- do_predict(mi, out)
- else:
- # default mode: find longest matching prefix
- last = b'\0'*20
- longmatch = 0
- for i in mi:
- if i == last:
- continue
- #assert(str(i) >= last)
- pm = _helpers.bitmatch(last, i)
- longmatch = max(longmatch, pm)
- last = i
- out.write(b'%d\n' % longmatch)
- log('%d matching prefix bits\n' % longmatch)
- doublings = math.log(len(mi), 2)
+ # default mode: find longest matching prefix
+ last = b'\0'*20
+ longmatch = 0
+ for i in mi:
+ if i == last:
+ continue
+ #assert(str(i) >= last)
+ pm = _helpers.bitmatch(last, i)
+ longmatch = max(longmatch, pm)
+ last = i
+ out.write(b'%d\n' % longmatch)
+ log('%d matching prefix bits\n' % longmatch)
+ doublings = math.log(len(mi), 2)
+
bpd = longmatch / doublings
log('%.2f bits per doubling\n' % bpd)
remain = 160 - longmatch
diff --git a/lib/bup/cmd/memtest.py b/lib/bup/cmd/memtest.py
index b2027c42..e20c1b67 100755
--- a/lib/bup/cmd/memtest.py
+++ b/lib/bup/cmd/memtest.py
@@ -79,8 +79,6 @@ def main(argv):
o.fatal('no arguments expected')

git.check_repo_or_die()
- m = git.PackIdxList(git.repo(b'objects/pack'), ignore_midx=opt.ignore_midx)
-
sys.stdout.flush()
out = byte_stream(sys.stdout)

@@ -88,27 +86,30 @@ def main(argv):
_helpers.random_sha()
report(0, out)

- if opt.existing:
- def foreverit(mi):
- while 1:
- for e in mi:
- yield e
- objit = iter(foreverit(m))
-
- for c in range(opt.cycles):
- for n in range(opt.number):
- if opt.existing:
- bin = next(objit)
- assert(m.exists(bin))
- else:
- bin = _helpers.random_sha()
-
- # technically, a randomly generated object id might exist.
- # but the likelihood of that is the likelihood of finding
- # a collision in sha-1 by accident, which is so unlikely that
- # we don't care.
- assert(not m.exists(bin))
- report((c+1)*opt.number, out)
+ with git.PackIdxList(git.repo(b'objects/pack'),
+ ignore_midx=opt.ignore_midx) as m:
+
+ if opt.existing:
+ def foreverit(mi):
+ while 1:
+ for e in mi:
+ yield e
+ objit = iter(foreverit(m))
+
+ for c in range(opt.cycles):
+ for n in range(opt.number):
+ if opt.existing:
+ bin = next(objit)
+ assert(m.exists(bin))
+ else:
+ bin = _helpers.random_sha()
+
+ # technically, a randomly generated object id might exist.
+ # but the likelihood of that is the likelihood of finding
+ # a collision in sha-1 by accident, which is so unlikely that
+ # we don't care.
+ assert(not m.exists(bin))
+ report((c+1)*opt.number, out)

if bloom._total_searches:
out.write(b'bloom: %d objects searched in %d steps: avg %.3f steps/object\n'
diff --git a/lib/bup/cmd/tag.py b/lib/bup/cmd/tag.py
index 0f5475d5..3bb1b409 100755
--- a/lib/bup/cmd/tag.py
+++ b/lib/bup/cmd/tag.py
@@ -74,10 +74,10 @@ def main(argv):
log("bup: error: commit %s not found.\n" % commit.decode('ascii'))
sys.exit(2)

- pL = git.PackIdxList(git.repo(b'objects/pack'))
- if not pL.exists(hash):
- log("bup: error: commit %s not found.\n" % commit.decode('ascii'))
- sys.exit(2)
+ with git.PackIdxList(git.repo(b'objects/pack')) as pL:
+ if not pL.exists(hash):
+ log("bup: error: commit %s not found.\n" % commit.decode('ascii'))
+ sys.exit(2)

tag_file = git.repo(b'refs/tags/' + tag_name)
try:
diff --git a/lib/bup/git.py b/lib/bup/git.py
index 0dc6e729..e3b235c5 100644
--- a/lib/bup/git.py
+++ b/lib/bup/git.py
@@ -14,6 +14,7 @@ from bup import _helpers, hashsplit, path, midx, bloom, xstat
from bup.compat import (buffer,
byte_int, bytes_from_byte, bytes_from_uint,
environ,
+ ExitStack,
items,
pending_raise,
range,
@@ -27,6 +28,7 @@ from bup.helpers import (Sha1, add_error, chunkyreader, debug1, debug2,
merge_dict,
merge_iter,
mmap_read, mmap_readwrite,
+ nullcontext_if_not,
progress, qprogress, stat_if_exists,
unlink,
utc_offset_str)
@@ -517,8 +519,10 @@ _mpi_count = 0
class PackIdxList:
def __init__(self, dir, ignore_midx=False):
global _mpi_count
+ # Q: was this also intended to prevent opening multiple repos?
assert(_mpi_count == 0) # these things suck tons of VM; don't waste it
_mpi_count += 1
+ self.open = True
self.dir = dir
self.also = set()
self.packs = []
@@ -527,10 +531,32 @@ class PackIdxList:
self.ignore_midx = ignore_midx
self.refresh()

- def __del__(self):
+ def close(self):
global _mpi_count
+ if not self.open:
+ assert _mpi_count == 0
+ return
_mpi_count -= 1
- assert(_mpi_count == 0)
+ assert _mpi_count == 0
+ self.also = None
+ self.bloom, bloom = None, self.bloom
+ self.packs, packs = None, self.packs
+ self.open = False
+ with ExitStack() as stack:
+ for pack in packs:
+ stack.enter_context(pack)
+ if bloom:
+ bloom.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ with pending_raise(value, rethrow=False):
+ self.close()
+
+ def __del__(self):
+ assert not self.open

def __iter__(self):
return iter(idxmerge(self.packs))
@@ -721,7 +747,6 @@ def create_commit_blob(tree, parent,
l.append(msg)
return b'\n'.join(l)

-
def _make_objcache():
return PackIdxList(repo(b'objects/pack'))

@@ -878,45 +903,49 @@ class PackWriter:
self.file, f = None, self.file
self.idx, idx = None, self.idx
self.parentfd, pfd, = None, self.parentfd
- self.objcache = None
-
- with finalized(pfd, lambda x: x is not None and os.close(x)), \
- f:

- if abort:
- os.unlink(self.filename + b'.pack')
- return None
+ try:
+ with nullcontext_if_not(self.objcache), \
+ finalized(pfd, lambda x: x is not None and os.close(x)), \
+ f:
+
+ if abort:
+ os.unlink(self.filename + b'.pack')
+ return None
+
+ # update object count
+ f.seek(8)
+ cp = struct.pack('!i', self.count)
+ assert len(cp) == 4
+ f.write(cp)
+
+ # calculate the pack sha1sum
+ f.seek(0)
+ sum = Sha1()
+ for b in chunkyreader(f):
+ sum.update(b)
+ packbin = sum.digest()
+ f.write(packbin)
+ f.flush()
+ fdatasync(f.fileno())
+ f.close()

- # update object count
- f.seek(8)
- cp = struct.pack('!i', self.count)
- assert len(cp) == 4
- f.write(cp)
-
- # calculate the pack sha1sum
- f.seek(0)
- sum = Sha1()
- for b in chunkyreader(f):
- sum.update(b)
- packbin = sum.digest()
- f.write(packbin)
- f.flush()
- fdatasync(f.fileno())
- f.close()
-
- idx.write(self.filename + b'.idx', packbin)
- nameprefix = os.path.join(self.repo_dir,
- b'objects/pack/pack-' + hexlify(packbin))
- if os.path.exists(self.filename + b'.map'):
- os.unlink(self.filename + b'.map')
- os.rename(self.filename + b'.pack', nameprefix + b'.pack')
- os.rename(self.filename + b'.idx', nameprefix + b'.idx')
- os.fsync(pfd)
- if run_midx:
- auto_midx(os.path.join(self.repo_dir, b'objects/pack'))
- if self.on_pack_finish:
- self.on_pack_finish(nameprefix)
- return nameprefix
+ idx.write(self.filename + b'.idx', packbin)
+ nameprefix = os.path.join(self.repo_dir,
+ b'objects/pack/pack-' + hexlify(packbin))
+ if os.path.exists(self.filename + b'.map'):
+ os.unlink(self.filename + b'.map')
+ os.rename(self.filename + b'.pack', nameprefix + b'.pack')
+ os.rename(self.filename + b'.idx', nameprefix + b'.idx')
+ os.fsync(pfd)
+ if run_midx:
+ auto_midx(os.path.join(self.repo_dir, b'objects/pack'))
+ if self.on_pack_finish:
+ self.on_pack_finish(nameprefix)
+ return nameprefix
+ finally:
+ # Must be last -- some of the code above depends on it
+ self.objcache = None

def abort(self):
"""Remove the pack file from disk."""
@@ -1090,16 +1119,15 @@ def rev_parse(committish, repo_dir=None):
debug2("resolved from ref: commit = %s\n" % hexlify(head))
return head

- pL = PackIdxList(repo(b'objects/pack', repo_dir=repo_dir))
-
if len(committish) == 40:
try:
hash = unhexlify(committish)
except TypeError:
return None

- if pL.exists(hash):
- return hash
+ with PackIdxList(repo(b'objects/pack', repo_dir=repo_dir)) as pL:
+ if pL.exists(hash):
+ return hash

return None

diff --git a/lib/bup/helpers.py b/lib/bup/helpers.py
index 3b37a2b7..0530b51a 100644
--- a/lib/bup/helpers.py
+++ b/lib/bup/helpers.py
@@ -12,7 +12,7 @@ import hashlib, heapq, math, operator, time, tempfile

from bup import _helpers
from bup import compat
-from bup.compat import argv_bytes, byte_int, pending_raise
+from bup.compat import argv_bytes, byte_int, nullcontext, pending_raise
from bup.io import byte_stream, path_msg
# This function should really be in helpers, not in bup.options. But we
# want options.py to be standalone so people can include it in other projects.
@@ -27,6 +27,10 @@ class Nonlocal:
pass


+def nullcontext_if_not(manager):
+ return manager if manager is not None else nullcontext()
+
+
@contextmanager
def finalized(enter_result=None, finalize=None):
assert finalize
diff --git a/test/int/test_client.py b/test/int/test_client.py
index 757c3ab2..de17ecea 100644
--- a/test/int/test_client.py
+++ b/test/int/test_client.py
@@ -23,12 +23,10 @@ IDX_PAT = b'/*.idx'
def test_server_split_with_indexes(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- with git.PackWriter() as lw, \
- client.Client(bupdir, create=True) as c, \
- c.new_packwriter() as rw:
+ with git.PackWriter() as lw:
lw.new_blob(s1)
- lw.close()
-
+ with client.Client(bupdir, create=True) as c, \
+ c.new_packwriter() as rw:
rw.new_blob(s2)
rw.breakpoint()
rw.new_blob(s1)
@@ -122,25 +120,26 @@ def test_midx_refreshing(tmpdir):
p2base = rw.close()
p2name = os.path.join(c.cachedir, p2base)

- pi = git.PackIdxList(bupdir + b'/objects/pack')
- assert len(pi.packs) == 2
- pi.refresh()
- assert len(pi.packs) == 2
- assert sorted([os.path.basename(i.name) for i in pi.packs]) == sorted([p1base, p2base])
-
- with git.open_idx(p1name) as p1, \
- git.open_idx(p2name) as p2:
- assert p1.exists(s1sha)
- assert not p2.exists(s1sha)
- assert p2.exists(s2sha)
-
- subprocess.call([path.exe(), b'midx', b'-f'])
- pi.refresh()
- assert len(pi.packs) == 1
- pi.refresh(skip_midx=True)
- assert len(pi.packs) == 2
- pi.refresh(skip_midx=False)
- assert len(pi.packs) == 1
+ with git.PackIdxList(bupdir + b'/objects/pack') as pi:
+ assert len(pi.packs) == 2
+ pi.refresh()
+ assert len(pi.packs) == 2
+ assert sorted([os.path.basename(i.name) for i in pi.packs]) \
+ == sorted([p1base, p2base])
+
+ with git.open_idx(p1name) as p1, \
+ git.open_idx(p2name) as p2:
+ assert p1.exists(s1sha)
+ assert not p2.exists(s1sha)
+ assert p2.exists(s2sha)
+
+ subprocess.call([path.exe(), b'midx', b'-f'])
+ pi.refresh()
+ assert len(pi.packs) == 1
+ pi.refresh(skip_midx=True)
+ assert len(pi.packs) == 2
+ pi.refresh(skip_midx=False)
+ assert len(pi.packs) == 1


def test_remote_parsing():
diff --git a/test/int/test_git.py b/test/int/test_git.py
index cae56caf..616fc6e7 100644
--- a/test/int/test_git.py
+++ b/test/int/test_git.py
@@ -147,10 +147,10 @@ def test_packs(tmpdir):

WVFAIL(r.find_offset(b'\0'*20))

- r = git.PackIdxList(bupdir + b'/objects/pack')
- WVPASS(r.exists(hashes[5]))
- WVPASS(r.exists(hashes[6]))
- WVFAIL(r.exists(b'\0'*20))
+ with git.PackIdxList(bupdir + b'/objects/pack') as r:
+ WVPASS(r.exists(hashes[5]))
+ WVPASS(r.exists(hashes[6]))
+ WVFAIL(r.exists(b'\0'*20))


def test_pack_name_lookup(tmpdir):
@@ -169,11 +169,11 @@ def test_pack_name_lookup(tmpdir):
log('\n')
idxnames.append(os.path.basename(w.close() + b'.idx'))

- r = git.PackIdxList(packdir)
- WVPASSEQ(len(r.packs), 2)
- for e,idxname in enumerate(idxnames):
- for i in range(e*2, (e+1)*2):
- WVPASSEQ(idxname, r.exists(hashes[i], want_source=True))
+ with git.PackIdxList(packdir) as r:
+ WVPASSEQ(len(r.packs), 2)
+ for e,idxname in enumerate(idxnames):
+ for i in range(e*2, (e+1)*2):
+ WVPASSEQ(idxname, r.exists(hashes[i], want_source=True))


def test_long_index(tmpdir):
@@ -511,35 +511,35 @@ def test_midx_close(tmpdir):
for i in range(10):
_create_idx(tmpdir, i)
git.auto_midx(tmpdir)
- l = git.PackIdxList(tmpdir)
+ with git.PackIdxList(tmpdir) as l:
# this doesn't exist (yet)
- WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
- for i in range(10, 15):
- _create_idx(tmpdir, i)
- # delete the midx ...
- # TODO: why do we need to? git.auto_midx() below doesn't?!
- for fn in os.listdir(tmpdir):
- if fn.endswith(b'.midx'):
- os.unlink(os.path.join(tmpdir, fn))
- # and make a new one
- git.auto_midx(tmpdir)
- # check it still doesn't exist - we haven't refreshed
- WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
- # check that we still have the midx open, this really
- # just checks more for the kernel API ('deleted' string)
- for fn in openfiles():
- if not b'midx-' in fn:
- continue
- WVPASSEQ(True, b'deleted' in fn)
- # refresh the PackIdxList
- l.refresh()
- # and check that an object in pack 10 exists now
- WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0)))
- for fn in openfiles():
- if not b'midx-' in fn:
- continue
- # check that we don't have it open anymore
- WVPASSEQ(False, b'deleted' in fn)
+ WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
+ for i in range(10, 15):
+ _create_idx(tmpdir, i)
+ # delete the midx ...
+ # TODO: why do we need to? git.auto_midx() below doesn't?!
+ for fn in os.listdir(tmpdir):
+ if fn.endswith(b'.midx'):
+ os.unlink(os.path.join(tmpdir, fn))
+ # and make a new one
+ git.auto_midx(tmpdir)
+ # check it still doesn't exist - we haven't refreshed
+ WVPASSEQ(None, l.exists(struct.pack('18xBB', 10, 0)))
+ # check that we still have the midx open, this really
+ # just checks more for the kernel API ('deleted' string)
+ for fn in openfiles():
+ if not b'midx-' in fn:
+ continue
+ WVPASSEQ(True, b'deleted' in fn)
+ # refresh the PackIdxList
+ l.refresh()
+ # and check that an object in pack 10 exists now
+ WVPASSEQ(True, l.exists(struct.pack('18xBB', 10, 0)))
+ for fn in openfiles():
+ if not b'midx-' in fn:
+ continue
+ # check that we don't have it open anymore
+ WVPASSEQ(False, b'deleted' in fn)

def test_config():
cfg_file = os.path.join(os.path.dirname(__file__), 'sample.conf')
--
2.30.2

Reply all
Reply to author
Forward
0 new messages