Finally got back to this, and I think this commit may be the last one
we need before we can include Bas's commit to remove (the now
invalid) __del__ from packwriters. I tried to make the changes in a
way that's less likely to conflict with Johannes' pending work
(e.g. my use of the packwriter_guards), but if that's not actually
true, we can just hold off until I get further through that series,
since this work is fairly easy to adapt or even recreate.
Now that I look, I suspect the most disruptive bit might be the
cmd.server changes.
I did restore one use of a try/except/else, not because it changes
the semantics (I hope), but because it seems like a useful indicator
that the close "goes with" the try, and any additional intervening
code (before the close) would be a mistake.
lib/bup/cmd/save.py | 15 ++++-
lib/bup/cmd/server.py | 131 +++++++++++++++++++-----------------
lib/bup/cmd/split.py | 18 ++++-
lib/bup/gc.py | 8 +--
lib/bup/rm.py | 7 +-
test/int/test_client.py | 145 +++++++++++++++++++---------------------
test/int/test_git.py | 73 ++++++++++----------
7 files changed, 213 insertions(+), 184 deletions(-)
diff --git a/lib/bup/cmd/save.py b/lib/bup/cmd/save.py
index 1325b4d9..970d4554 100755
--- a/lib/bup/cmd/save.py
+++ b/lib/bup/cmd/save.py
@@ -47,7 +47,9 @@ def before_saving_regular_file(name):
return
-def main(argv):
+def _guarded_main(argv, packwriter_guard):
+ # The packwriter_guard is a hack we'll only need until we're ready
+ # to restructure this code to use a context manager instead.
# Hack around lack of nonlocal vars in python 2
_nonlocal = {}
@@ -125,6 +127,7 @@ def main(argv):
cli = None
oldref = refname and git.read_ref(refname) or None
w = git.PackWriter(compression_level=opt.compress)
+ packwriter_guard.append(w)
handle_ctrl_c()
@@ -523,3 +526,13 @@ def main(argv):
if saved_errors:
log('WARNING: %d errors encountered while saving.\n' % len(saved_errors))
sys.exit(1)
+
+def main(argv):
+ # This hack avoids having to restructure/reindent main until we're
+ # ready.
+ packwriter_guard = []
+ try:
+ return _guarded_main(argv, packwriter_guard)
+ finally:
+ if packwriter_guard:
+ packwriter_guard[0].close()
diff --git a/lib/bup/cmd/server.py b/lib/bup/cmd/server.py
index 35744284..95d94251 100755
--- a/lib/bup/cmd/server.py
+++ b/lib/bup/cmd/server.py
@@ -90,50 +90,55 @@ def receive_objects_v2(conn, junk):
w = git.PackWriter(objcache_maker=None)
else:
w = git.PackWriter()
- while 1:
- ns = conn.read(4)
- if not ns:
- w.abort()
- raise Exception('object read: expected length header, got EOF\n')
- n = struct.unpack('!I', ns)[0]
- #debug2('expecting %d bytes\n' % n)
- if not n:
- debug1('bup server: received %d object%s.\n'
- % (w.count, w.count!=1 and "s" or ''))
- fullpath = w.close(run_midx=not dumb_server_mode)
- if fullpath:
- (dir, name) = os.path.split(fullpath)
- conn.write(b'%s.idx\n' % name)
- conn.ok()
- return
- elif n == 0xffffffff:
- debug2('bup server: receive-objects suspended.\n')
- suspended_w = w
- conn.ok()
- return
-
- shar = conn.read(20)
- crcr = struct.unpack('!I', conn.read(4))[0]
- n -= 20 + 4
- buf = conn.read(n) # object sizes in bup are reasonably small
- #debug2('read %d bytes\n' % n)
- _check(w, n, len(buf), 'object read: expected %d bytes, got %d\n')
- if not dumb_server_mode:
- oldpack = w.exists(shar, want_source=True)
- if oldpack:
- assert(not oldpack == True)
- assert(oldpack.endswith(b'.idx'))
- (dir,name) = os.path.split(oldpack)
- if not (name in suggested):
- debug1("bup server: suggesting index %s\n"
- % git.shorten_hash(name).decode('ascii'))
- debug1("bup server: because of object %s\n"
- % hexstr(shar))
- conn.write(b'index %s\n' % name)
- suggested.add(name)
- continue
- nw, crc = w._raw_write((buf,), sha=shar)
- _check(w, crcr, crc, 'object read: expected crc %d, got %d\n')
+ try:
+ while 1:
+ ns = conn.read(4)
+ if not ns:
+ w.abort()
+ raise Exception('object read: expected length header, got EOF\n')
+ n = struct.unpack('!I', ns)[0]
+ #debug2('expecting %d bytes\n' % n)
+ if not n:
+ debug1('bup server: received %d object%s.\n'
+ % (w.count, w.count!=1 and "s" or ''))
+ fullpath = w.close(run_midx=not dumb_server_mode)
+ if fullpath:
+ (dir, name) = os.path.split(fullpath)
+ conn.write(b'%s.idx\n' % name)
+ conn.ok()
+ return
+ elif n == 0xffffffff:
+ debug2('bup server: receive-objects suspended.\n')
+ suspended_w = w
+ w = None
+ conn.ok()
+ return
+
+ shar = conn.read(20)
+ crcr = struct.unpack('!I', conn.read(4))[0]
+ n -= 20 + 4
+ buf = conn.read(n) # object sizes in bup are reasonably small
+ #debug2('read %d bytes\n' % n)
+ _check(w, n, len(buf), 'object read: expected %d bytes, got %d\n')
+ if not dumb_server_mode:
+ oldpack = w.exists(shar, want_source=True)
+ if oldpack:
+ assert(not oldpack == True)
+ assert(oldpack.endswith(b'.idx'))
+ (dir,name) = os.path.split(oldpack)
+ if not (name in suggested):
+ debug1("bup server: suggesting index %s\n"
+ % git.shorten_hash(name).decode('ascii'))
+ debug1("bup server: because of object %s\n"
+ % hexstr(shar))
+ conn.write(b'index %s\n' % name)
+ suggested.add(name)
+ continue
+ nw, crc = w._raw_write((buf,), sha=shar)
+ _check(w, crcr, crc, 'object read: expected crc %d, got %d\n')
+ finally:
+ if w:
+ w.close()
# NOTREACHED
@@ -278,9 +283,10 @@ commands = {
}
def main(argv):
+ global suspended_w
+
o = options.Options(optspec)
opt, flags, extra = o.parse_bytes(argv[1:])
-
if extra:
o.fatal('no arguments expected')
@@ -291,21 +297,26 @@ def main(argv):
sys.stdout.flush()
conn = Conn(byte_stream(sys.stdin), byte_stream(sys.stdout))
lr = linereader(conn)
- for _line in lr:
- line = _line.strip()
- if not line:
- continue
- debug1('bup server: command: %r\n' % line)
- words = line.split(b' ', 1)
- cmd = words[0]
- rest = len(words)>1 and words[1] or b''
- if cmd == b'quit':
- break
- else:
- cmd = commands.get(cmd)
- if cmd:
- cmd(conn, rest)
+ try:
+ for _line in lr:
+ line = _line.strip()
+ if not line:
+ continue
+ debug1('bup server: command: %r\n' % line)
+ words = line.split(b' ', 1)
+ cmd = words[0]
+ rest = len(words)>1 and words[1] or b''
+ if cmd == b'quit':
+ break
else:
- raise Exception('unknown server command: %r\n' % line)
+ cmd = commands.get(cmd)
+ if cmd:
+ cmd(conn, rest)
+ else:
+ raise Exception('unknown server command: %r\n' % line)
+ finally:
+ if suspended_w:
+ suspended_w.close()
+ suspended_w = None
debug1('bup server: done\n')
diff --git a/lib/bup/cmd/split.py b/lib/bup/cmd/split.py
index 11392e20..f8a007d0 100755
--- a/lib/bup/cmd/split.py
+++ b/lib/bup/cmd/split.py
@@ -41,7 +41,10 @@ bwlimit= maximum bytes/sec to transmit to server
#,compress= set compression level to # (0-9, 9 is highest) [1]
"""
-def main(argv):
+def _guarded_main(argv, packwriter_guard):
+ # The packwriter_guard is a hack we'll only need until we're ready
+ # to restructure this code to use a context manager instead.
+
o = options.Options(optspec)
opt, flags, extra = o.parse_bytes(argv[1:])
if
opt.name:
opt.name = argv_bytes(
opt.name)
@@ -118,6 +121,8 @@ def main(argv):
pack_writer = git.PackWriter(compression_level=opt.compress,
max_pack_size=max_pack_size,
max_pack_objects=max_pack_objects)
+ if pack_writer:
+ packwriter_guard.append(pack_writer)
input = byte_stream(sys.stdin)
@@ -234,3 +239,14 @@ def main(argv):
if saved_errors:
log('WARNING: %d errors encountered while saving.\n' % len(saved_errors))
sys.exit(1)
+
+def main(argv):
+ # This hack avoids having to restructure/reindent main until we're
+ # ready.
+ packwriter_guard = []
+ try:
+ return _guarded_main(argv, packwriter_guard)
+ finally:
+ if packwriter_guard:
+ print('foo:', packwriter_guard, file=sys.stderr)
+ packwriter_guard[0].close()
diff --git a/lib/bup/gc.py b/lib/bup/gc.py
index b757034c..e217372e 100644
--- a/lib/bup/gc.py
+++ b/lib/bup/gc.py
@@ -209,10 +209,10 @@ def sweep(live_objects, existing_count, cat_pipe, threshold, compression,
except BaseException as ex:
with pending_raise(ex):
writer.abort()
-
- # This will finally run midx.
- # Can only change refs (if needed) after this.
- writer.close()
+ else:
+ # This will finally run midx.
+ # Can only change refs (if needed) after this.
+ writer.close()
remove_stale_files(None) # In case we didn't write to the writer.
diff --git a/lib/bup/rm.py b/lib/bup/rm.py
index 844bdf15..e7b34f78 100644
--- a/lib/bup/rm.py
+++ b/lib/bup/rm.py
@@ -116,10 +116,9 @@ def bup_rm(repo, paths, compression=6, verbosity=None):
assert(saves)
updated_refs[b'refs/heads/' + branch] = rm_saves(saves, writer)
except BaseException as ex:
- if writer:
- with pending_raise(ex):
- writer.abort()
- if writer:
+ with pending_raise(ex):
+ writer.abort()
+ else:
# Must close before we can update the ref(s) below.
writer.close()
diff --git a/test/int/test_client.py b/test/int/test_client.py
index b3cdba4b..66f3214a 100644
--- a/test/int/test_client.py
+++ b/test/int/test_client.py
@@ -23,76 +23,70 @@ IDX_PAT = b'/*.idx'
def test_server_split_with_indexes(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- lw = git.PackWriter()
- c = client.Client(bupdir, create=True)
- rw = c.new_packwriter()
-
- lw.new_blob(s1)
- lw.close()
-
- rw.new_blob(s2)
- rw.breakpoint()
- rw.new_blob(s1)
- rw.close()
+ with git.PackWriter() as lw:
+ c = client.Client(bupdir, create=True)
+ with c.new_packwriter() as rw:
+ lw.new_blob(s1)
+ lw.close()
+ rw.new_blob(s2)
+ rw.breakpoint()
+ rw.new_blob(s1)
def test_multiple_suggestions(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
- lw = git.PackWriter()
- lw.new_blob(s1)
- lw.close()
- lw = git.PackWriter()
- lw.new_blob(s2)
- lw.close()
+ with git.PackWriter() as lw:
+ lw.new_blob(s1)
+ with git.PackWriter() as lw:
+ lw.new_blob(s2)
assert len(glob.glob(git.repo(b'objects/pack'+IDX_PAT))) == 2
c = client.Client(bupdir, create=True)
assert len(glob.glob(c.cachedir+IDX_PAT)) == 0
- rw = c.new_packwriter()
- s1sha = rw.new_blob(s1)
- assert rw.exists(s1sha)
- s2sha = rw.new_blob(s2)
-
- # This is a little hacky, but ensures that we test the
- # code under test. First, flush to ensure that we've
- # actually sent all the command ('receive-objects-v2')
- # and their data to the server. This may be needed if
- # the output buffer size is bigger than the data (both
- # command and objects) we're writing. To see the need
- # for this, change the object sizes at the beginning
- # of this file to be very small (e.g. 10 instead of 10k)
- c.conn.outp.flush()
-
- # Then, check if we've already received the idx files.
- # This may happen if we're preempted just after writing
- # the data, then the server runs and suggests, and only
- # then we continue in PackWriter_Remote::_raw_write()
- # and check the has_input(), in that case we'll receive
- # the idx still in the rw.new_blob() calls above.
- #
- # In most cases though, that doesn't happen, and we'll
- # get past the has_input() check before the server has
- # a chance to respond - it has to actually hash the new
- # object here, so it takes some time. So also break out
- # of the loop if the server has sent something on the
- # connection.
- #
- # Finally, abort this after a little while (about one
- # second) just in case something's actually broken.
- n = 0
- while (len(glob.glob(c.cachedir+IDX_PAT)) < 2 and
- not c.conn.has_input() and n < 10):
- time.sleep(0.1)
- n += 1
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 2 or c.conn.has_input()
- rw.new_blob(s2)
- assert rw.objcache.exists(s1sha)
- assert rw.objcache.exists(s2sha)
- rw.new_blob(s3)
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
- rw.close()
+ with c.new_packwriter() as rw:
+ s1sha = rw.new_blob(s1)
+ assert rw.exists(s1sha)
+ s2sha = rw.new_blob(s2)
+
+ # This is a little hacky, but ensures that we test the
+ # code under test. First, flush to ensure that we've
+ # actually sent all the command ('receive-objects-v2')
+ # and their data to the server. This may be needed if
+ # the output buffer size is bigger than the data (both
+ # command and objects) we're writing. To see the need
+ # for this, change the object sizes at the beginning
+ # of this file to be very small (e.g. 10 instead of 10k)
+ c.conn.outp.flush()
+
+ # Then, check if we've already received the idx files.
+ # This may happen if we're preempted just after writing
+ # the data, then the server runs and suggests, and only
+ # then we continue in PackWriter_Remote::_raw_write()
+ # and check the has_input(), in that case we'll receive
+ # the idx still in the rw.new_blob() calls above.
+ #
+ # In most cases though, that doesn't happen, and we'll
+ # get past the has_input() check before the server has
+ # a chance to respond - it has to actually hash the new
+ # object here, so it takes some time. So also break out
+ # of the loop if the server has sent something on the
+ # connection.
+ #
+ # Finally, abort this after a little while (about one
+ # second) just in case something's actually broken.
+ n = 0
+ while (len(glob.glob(c.cachedir+IDX_PAT)) < 2 and
+ not c.conn.has_input() and n < 10):
+ time.sleep(0.1)
+ n += 1
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 2 or c.conn.has_input()
+ rw.new_blob(s2)
+ assert rw.objcache.exists(s1sha)
+ assert rw.objcache.exists(s2sha)
+ rw.new_blob(s3)
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
assert len(glob.glob(c.cachedir+IDX_PAT)) == 3
@@ -101,17 +95,15 @@ def test_dumb_client_server(tmpdir):
git.init_repo(bupdir)
open(git.repo(b'bup-dumb-server'), 'w').close()
- lw = git.PackWriter()
- lw.new_blob(s1)
- lw.close()
+ with git.PackWriter() as lw:
+ lw.new_blob(s1)
c = client.Client(bupdir, create=True)
- rw = c.new_packwriter()
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
- rw.new_blob(s1)
- assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
- rw.new_blob(s2)
- rw.close()
+ with c.new_packwriter() as rw:
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
+ rw.new_blob(s1)
+ assert len(glob.glob(c.cachedir+IDX_PAT)) == 1
+ rw.new_blob(s2)
assert len(glob.glob(c.cachedir+IDX_PAT)) == 2
@@ -119,15 +111,14 @@ def test_midx_refreshing(tmpdir):
environ[b'BUP_DIR'] = bupdir = tmpdir
git.init_repo(bupdir)
c = client.Client(bupdir, create=True)
- rw = c.new_packwriter()
- rw.new_blob(s1)
- p1base = rw.breakpoint()
- p1name = os.path.join(c.cachedir, p1base)
- s1sha = rw.new_blob(s1) # should not be written; it's already in p1
- s2sha = rw.new_blob(s2)
- p2base = rw.close()
+ with c.new_packwriter() as rw:
+ rw.new_blob(s1)
+ p1base = rw.breakpoint()
+ p1name = os.path.join(c.cachedir, p1base)
+ s1sha = rw.new_blob(s1) # should not be written; it's already in p1
+ s2sha = rw.new_blob(s2)
+ p2base = rw.close()
p2name = os.path.join(c.cachedir, p2base)
- del rw
pi = git.PackIdxList(bupdir + b'/objects/pack')
assert len(pi.packs) == 2
diff --git a/test/int/test_git.py b/test/int/test_git.py
index 7965c60c..f5eab44c 100644
--- a/test/int/test_git.py
+++ b/test/int/test_git.py
@@ -117,18 +117,18 @@ def test_packs(tmpdir):
git.init_repo(bupdir)
git.verbose = 1
- w = git.PackWriter()
- w.new_blob(os.urandom(100))
- w.new_blob(os.urandom(100))
- w.abort()
-
- w = git.PackWriter()
- hashes = []
- nobj = 1000
- for i in range(nobj):
- hashes.append(w.new_blob(b'%d' % i))
- log('\n')
- nameprefix = w.close()
+ with git.PackWriter() as w:
+ w.new_blob(os.urandom(100))
+ w.new_blob(os.urandom(100))
+ w.abort()
+
+ with git.PackWriter() as w:
+ hashes = []
+ nobj = 1000
+ for i in range(nobj):
+ hashes.append(w.new_blob(b'%d' % i))
+ log('\n')
+ nameprefix = w.close()
print(repr(nameprefix))
WVPASS(os.path.exists(nameprefix + b'.pack'))
WVPASS(os.path.exists(nameprefix + b'.idx'))
@@ -163,11 +163,11 @@ def test_pack_name_lookup(tmpdir):
hashes = []
for start in range(0,28,2):
- w = git.PackWriter()
- for i in range(start, start+2):
- hashes.append(w.new_blob(b'%d' % i))
- log('\n')
- idxnames.append(os.path.basename(w.close() + b'.idx'))
+ with git.PackWriter() as w:
+ for i in range(start, start+2):
+ hashes.append(w.new_blob(b'%d' % i))
+ log('\n')
+ idxnames.append(os.path.basename(w.close() + b'.idx'))
r = git.PackIdxList(packdir)
WVPASSEQ(len(r.packs), 2)
@@ -310,31 +310,30 @@ def test_new_commit(tmpdir):
git.init_repo(bupdir)
git.verbose = 1
- w = git.PackWriter()
- tree = os.urandom(20)
- parent = os.urandom(20)
- author_name = b'Author'
- author_mail = b'author@somewhere'
- adate_sec = 1439657836
- cdate_sec = adate_sec + 1
- committer_name = b'Committer'
- committer_mail = b'committer@somewhere'
- adate_tz_sec = cdate_tz_sec = None
- commit = w.new_commit(tree, parent,
- b'%s <%s>' % (author_name, author_mail),
- adate_sec, adate_tz_sec,
- b'%s <%s>' % (committer_name, committer_mail),
- cdate_sec, cdate_tz_sec,
- b'There is a small mailbox here')
- adate_tz_sec = -60 * 60
- cdate_tz_sec = 120 * 60
- commit_off = w.new_commit(tree, parent,
+ with git.PackWriter() as w:
+ tree = os.urandom(20)
+ parent = os.urandom(20)
+ author_name = b'Author'
+ author_mail = b'author@somewhere'
+ adate_sec = 1439657836
+ cdate_sec = adate_sec + 1
+ committer_name = b'Committer'
+ committer_mail = b'committer@somewhere'
+ adate_tz_sec = cdate_tz_sec = None
+ commit = w.new_commit(tree, parent,
b'%s <%s>' % (author_name, author_mail),
adate_sec, adate_tz_sec,
b'%s <%s>' % (committer_name, committer_mail),
cdate_sec, cdate_tz_sec,
b'There is a small mailbox here')
- w.close()
+ adate_tz_sec = -60 * 60
+ cdate_tz_sec = 120 * 60
+ commit_off = w.new_commit(tree, parent,
+ b'%s <%s>' % (author_name, author_mail),
+ adate_sec, adate_tz_sec,
+ b'%s <%s>' % (committer_name, committer_mail),
+ cdate_sec, cdate_tz_sec,
+ b'There is a small mailbox here')
commit_items = git.get_commit_items(hexlify(commit), git.cp())
local_author_offset = localtime(adate_sec).tm_gmtoff
--
2.30.2