import
sys, os, time, subprocess, unittest
from
Queue
import
Queue
from
threading
import
Thread, current_thread
class
QubesVm:
def
__init__(
self
, name, template, is_running):
self
.name
=
name
self
.template
=
template
self
._is_running
=
is_running
self
._is_updated
=
False
def
is_running(
self
):
return
self
._is_running
def
is_template(
self
):
return
(
self
.template
is
None
)
def
is_updated(
self
):
return
self
._is_updated
def
run(
self
, command, autostart
=
False
, verbose
=
True
, user
=
None
, notify_function
=
None
,
passio
=
False
, localcmd
=
None
, gui
=
True
, filter_esc
=
True
):
qvm_command
=
[
'qvm-run'
]
if
autostart:
qvm_command
+
=
[
'--autostart'
]
else
:
qvm_command
+
=
[
'--no-autostart'
]
if
not
verbose:
qvm_command
+
=
[
'--quiet'
]
if
user:
qvm_command
+
=
[
'--user=%s'
%
user]
if
gui:
qvm_command
+
=
[
'--gui'
]
else
:
qvm_command
+
=
[
'--no-gui'
]
qvm_command
+
=
[
self
.name, command]
subprocess.call(qvm_command)
class
QubesHandler:
def
__init__(
self
):
raw_result
=
self
._get_raw_qubes_string()
self
._qubes
=
{}
for
qube_string
in
raw_result.split(
"\n"
):
if
len
(qube_string)>
0
:
qube_properties
=
qube_string.split(
"|"
)
qube_class
=
qube_properties[
0
]
qube_name
=
qube_properties[
1
]
qube_template
=
None
;
if
qube_properties[
2
] !
=
'-'
:
qube_template
=
qube_properties[
2
]
qube_running
=
True
if
qube_properties[
3
]
=
=
'Halted'
:
qube_running
=
False
if
qube_class !
=
"AdminVM"
:
new_qube
=
self
._create_qube(qube_name, qube_template, qube_running)
self
._qubes[qube_name]
=
new_qube
def
_create_qube(
self
, qube_name, qube_template, qube_running):
return
QubesVm(qube_name, qube_template, qube_running)
def
_get_raw_qubes_string(
self
):
return
subprocess.check_output([
'qvm-ls'
,
'--fields'
,
'CLASS,NAME,TEMPLATE,STATE'
,
'--raw-data'
])
def
_get_template_dictionary(
self
, template_vm, rank):
return
{
'name'
: template_vm.name,
'qvm'
: template_vm,
'rank'
: rank}
def
_get_ranked_templates(
self
):
templates_ranked
=
{}
for
vm
in
self
._qubes.values():
if
vm.is_template():
if
not
vm.name
in
templates_ranked:
templates_ranked[vm.name]
=
self
._get_template_dictionary(vm,
0
)
else
:
if
vm.template:
if
vm.is_running():
rank
=
3
else
:
rank
=
1
if
not
vm.template
in
templates_ranked:
templates_ranked[vm.template]
=
\
self
._get_template_dictionary(
self
._qubes[vm.template], rank)
else
:
templates_ranked[vm.template][
'rank'
]
+
=
rank
return
templates_ranked
def
get_templates(
self
):
templates_ranked
=
self
._get_ranked_templates()
return
[templates_ranked[template]
for
template
in
sorted
(templates_ranked,
key
=
lambda
name: templates_ranked[name][
'rank'
],
reverse
=
True
)]
def
update_template(
self
, template, print_function):
print_function(
'Updating template...'
)
try
:
template[
'qvm'
].run(
'touch /tmp/update-all; chmod go+r /tmp/update-all; { if [ -f /usr/bin/snap '
+
']; then /usr/bin/snap refresh || exit; fi; if [ -f /usr/bin/apt-get '
+
']; then /usr/bin/apt-get update && /usr/bin/apt-get -y upgrade </dev/null '
+
'&& /usr/bin/apt-get -y autoremove && /usr/bin/apt-get autoclean && '
+
'/sbin/poweroff; else /usr/bin/dnf -y upgrade </dev/null && /usr/bin/dnf '
+
'clean all && /usr/sbin/poweroff; fi; } 2>&1 | tee -a /tmp/update-all'
,
autostart
=
True
, verbose
=
False
, user
=
'root'
, notify_function
=
None
,
passio
=
False
, localcmd
=
None
, gui
=
False
, filter_esc
=
True
)
print_function(
'Update complete.'
)
except
:
print_function(
'Update failed.'
, error
=
True
)
raise
class
UpdateWorker(Thread):
def
__init__(
self
, name, queue, qubes_handler):
Thread.__init__(
self
)
self
.daemon
=
True
self
.name
=
name
self
._queue
=
queue
self
._qubes_handler
=
qubes_handler
self
._template
=
None
def
_print_function(
self
, text, error
=
False
):
if
error:
prefix
=
'ERR'
else
:
prefix
=
'INF'
prefix
+
=
' ['
+
self
.name
+
' '
+
self
._template[
'name'
]
+
'] '
print
prefix
+
text
def
run(
self
):
while
True
:
self
._template
=
{
'name'
:
'None'
}
self
._template
=
self
._queue.get()
try
:
self
._qubes_handler.update_template(
self
._template,
self
._print_function)
self
._template
=
{
'name'
:
'None'
}
finally
:
self
._template
=
{
'name'
:
'None'
}
self
._queue.task_done()
self
._template
=
{
'name'
:
'None'
}
class
UpdateHandler:
def
__init__(
self
):
self
._queue
=
Queue()
def
update_templates(
self
, thread_number):
qubes_handler
=
self
._get_qubes_handler()
print
'INF Updating templates: '
, [template[
'name'
]
for
template
in
qubes_handler.get_templates()]
workers
=
[
self
._get_worker(
'w'
+
str
(number), qubes_handler)
for
number
in
range
(thread_number)]
for
worker
in
workers:
worker.start()
for
template
in
qubes_handler.get_templates():
self
._queue.put(template, block
=
True
)
self
._queue.join()
def
_get_worker(
self
, name, qubes_handler):
return
UpdateWorker(name,
self
._queue, qubes_handler)
def
_get_qubes_handler(
self
):
return
QubesHandler()
class
MockQubesVm(QubesVm):
def
__init__(
self
, name, template, is_running):
QubesVm.__init__(
self
, name, template, is_running)
self
.name
=
name
self
.template
=
template
self
._is_running
=
is_running
self
._is_updated
=
False
def
is_running(
self
):
return
self
._is_running
def
is_template(
self
):
return
(
self
.template
is
None
)
def
is_updated(
self
):
return
self
._is_updated
def
__repr__(
self
):
return
"MockQubesVm(%s,%s,%s)"
%
(
self
.name,
self
.template,
self
._is_running)
def
__eq__(
self
, other):
return
repr
(
self
)
=
=
repr
(other)
def
run(
self
, command, autostart
=
False
, verbose
=
True
, user
=
None
, notify_function
=
None
,
passio
=
False
, localcmd
=
None
, gui
=
True
, filter_esc
=
True
):
if
(autostart):
self
._is_running
=
True
if
(user
=
=
'root'
and
self
._is_running):
self
._is_updated
=
True
self
._is_running
=
False
class
MockQubesHandler(QubesHandler, unittest.TestCase):
def
__init__(
self
,
*
args,
*
*
kwargs):
if
(args):
unittest.TestCase.__init__(
self
,
*
args,
*
*
kwargs)
self
.maxDiff
=
None
self
._template_f
=
MockQubesVm(
'fedora-24'
,
None
,
False
)
self
._template_fm
=
MockQubesVm(
'fedora-24-minimal'
,
None
,
True
)
self
._template_d
=
MockQubesVm(
'debian-8'
,
None
,
False
)
self
._template_ws
=
MockQubesVm(
'whonix-ws'
,
None
,
True
)
self
._template_wg
=
MockQubesVm(
'whonix-wg'
,
None
,
True
)
self
._ranked_f
=
{
'name'
:
'fedora-24'
,
'qvm'
:
self
._template_f,
'rank'
:
9
}
self
._ranked_fm
=
{
'name'
:
'fedora-24-minimal'
,
'qvm'
:
self
._template_fm,
'rank'
:
4
}
self
._ranked_d
=
{
'name'
:
'debian-8'
,
'qvm'
:
self
._template_d,
'rank'
:
5
}
self
._ranked_ws
=
{
'name'
:
'whonix-ws'
,
'qvm'
:
self
._template_ws,
'rank'
:
1
}
self
._ranked_wg
=
{
'name'
:
'whonix-wg'
,
'qvm'
:
self
._template_wg,
'rank'
:
3
}
self
._expected_ranked
=
{
'fedora-24'
:
self
._ranked_f,
'fedora-24-minimal'
:
self
._ranked_fm,
'debian-8'
:
self
._ranked_d,
'whonix-ws'
:
self
._ranked_ws,
'whonix-wg'
:
self
._ranked_wg }
self
._expected
=
[
self
._ranked_f,
self
._ranked_d,
self
._ranked_fm,
self
._ranked_wg,
self
._ranked_ws]
QubesHandler.__init__(
self
)
def
_create_qube(
self
, qube_name, qube_template, qube_running):
return
MockQubesVm(qube_name, qube_template, qube_running)
def
_get_raw_qubes_string(
self
):
return
'AppVM|sys-net|fedora-24|Running\n'
+
\
'AppVM|sys-whonix|whonix-wg|Running\n'
+
\
'AppVM|personal|debian-8|Halted\n'
+
\
'AppVM|sys-firewall|fedora-24-minimal|Running\n'
+
\
'TemplateVM|debian-8|-|Halted\n'
+
\
'AdminVM|dom0|-|Running\n'
+
\
'AppVM|gnupg|fedora-24-minimal|Halted\n'
+
\
'AppVM|disp5|fedora-24|Suspended\n'
+
\
'AppVM|anonymous|whonix-ws|Halted\n'
+
\
'TemplateVM|fedora-24-minimal|-|True\n'
+
\
'TemplateVM|whonix-ws|-|Running\n'
+
\
'TemplateVM|whonix-wg|-|Running\n'
+
\
'AppVM|work|debian-8|Suspended\n'
+
\
'AppVM|development|debian-8|Halted\n'
+
\
'AppVM|disp2|fedora-24|Running\n'
+
\
'TemplateVM|fedora-24|-|Halted\n'
def
get_updated_status(
self
):
return
[
self
._qubes[
self
._template_f.name].is_updated(),
self
._qubes[
self
._template_fm.name].is_updated(),
self
._qubes[
self
._template_d.name].is_updated(),
self
._qubes[
self
._template_ws.name].is_updated(),
self
._qubes[
self
._template_wg.name].is_updated()]
def
test_get_template_dictionary(
self
):
self
.assert_get_template_dictionary(
self
._template_f)
self
.assert_get_template_dictionary(
self
._template_fm)
self
.assert_get_template_dictionary(
self
._template_d)
self
.assert_get_template_dictionary(
self
._template_ws)
self
.assert_get_template_dictionary(
self
._template_wg)
def
assert_get_template_dictionary(
self
, template):
found
=
self
._get_template_dictionary(template,
len
(template.name))
self
.assertEquals(
3
,
len
(found.keys()))
self
.assertEquals(template, found[
'qvm'
])
self
.assertEquals(
len
(template.name), found[
'rank'
])
self
.assertEquals(template.name, found[
'name'
])
def
test_get_template_dictionary_custom(
self
):
template
=
MockQubesVm(
'test-template'
,
None
,
False
)
found
=
self
._get_template_dictionary(template,
0
)
self
.assertEquals(
3
,
len
(found.keys()))
self
.assertEquals(template, found[
'qvm'
])
self
.assertEquals(
0
, found[
'rank'
])
self
.assertEquals(
'test-template'
, found[
'name'
])
def
test_get_templates(
self
):
found
=
self
.get_templates()
self
.assertEquals(
self
._expected, found)
def
test_get_ranked_templates(
self
):
found
=
self
._get_ranked_templates()
self
.assertEquals(
self
._expected_ranked, found)
class
MockUpdateWorker(UpdateWorker):
def
_print_function(
self
, text, error
=
False
):
pass
class
MockUpdateHandler(UpdateHandler, unittest.TestCase):
def
__init__(
self
,
*
args,
*
*
kwargs):
UpdateHandler.__init__(
self
)
unittest.TestCase.__init__(
self
,
*
args,
*
*
kwargs)
self
.generate_qubes_handler()
def
_get_worker(
self
, name, qubes_handler):
return
MockUpdateWorker(name,
self
._queue, qubes_handler)
def
_get_qubes_handler(
self
):
return
self
._qubes_handler
def
generate_qubes_handler(
self
):
self
._qubes_handler
=
MockQubesHandler()
def
test_update_all(
self
):
for
number
in
range
(
1
,
10
):
self
.assert_update_all(number)
self
.generate_qubes_handler()
def
assert_update_all(
self
, thread_number):
self
.assertEquals([
False
]
*
5
,
self
._qubes_handler.get_updated_status())
self
.update_templates(thread_number)
self
.assertEquals([
True
]
*
5
,
self
._qubes_handler.get_updated_status())
if
__name__
=
=
'__main__'
:
if
len
(sys.argv)
=
=
1
:
UpdateHandler().update_templates(
3
)
elif
len
(sys.argv) >
2
or
sys.argv[
1
] !
=
'test'
:
print
"ERR Usage: dom0-update-all [test]"
else
:
unittest.main(argv
=
sys.argv[:
1
])