This is mentioned in the documentation for Union, though Union
doesn't actually work, yet. This seems to work, at least in my
limited testing.
---
MANIFEST | 2 +
lib/MogileFS/ReplicationPolicy/OnDevice.pm | 58 ++++++++++++
t/ondevice-replpol.t | 100 +++++++++++++++++++++
3 files changed, 160 insertions(+)
create mode 100644 lib/MogileFS/ReplicationPolicy/OnDevice.pm
create mode 100644 t/ondevice-replpol.t
diff --git a/MANIFEST b/MANIFEST
index 28a075b..adb07e2 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -33,6 +33,7 @@ lib/MogileFS/Rebalance.pm
lib/MogileFS/ReplicationPolicy.pm
lib/MogileFS/ReplicationPolicy/MultipleDevices.pm
lib/MogileFS/ReplicationPolicy/MultipleHosts.pm
+lib/MogileFS/ReplicationPolicy/OnDevice.pm
lib/MogileFS/ReplicationPolicy/Union.pm
lib/MogileFS/ReplicationRequest.pm
lib/MogileFS/Server.pm
@@ -92,6 +93,7 @@ t/http.t
t/mogstored-shutdown.t
t/multiple-devices-replpol.t
t/multiple-hosts-replpol.t
+t/ondevice-replpol.t
t/replpolicy-parsing.t
t/replpolicy.t
t/store-sqlite.t
diff --git a/lib/MogileFS/ReplicationPolicy/OnDevice.pm b/lib/MogileFS/ReplicationPolicy/OnDevice.pm
new file mode 100644
index 0000000..e92b4b5
--- /dev/null
+++ b/lib/MogileFS/ReplicationPolicy/OnDevice.pm
@@ -0,0 +1,58 @@
+package MogileFS::ReplicationPolicy::OnDevice;
+use strict;
+use warnings;
+use base 'MogileFS::ReplicationPolicy';
+use MogileFS::ReplicationRequest qw(ALL_GOOD TOO_GOOD TEMP_NO_ANSWER);
+
+sub new {
+ my ($class, $devid) = @_;
+ bless { on_devid => $devid }, $class;
+}
+
+sub new_from_policy_args {
+ my ($class, $argref) = @_;
+ # Note: "MultipleDevices()" is okay, in which case the 'mindevcount'
+ # on the class is used. (see below)
+ $$argref =~ s/^\s* \( \s* (\d*) \s* \) \s*//x
+ or die "$class failed to parse args: $$argref";
+ $class->new($1)
+}
+
+sub mindevcount { 1 }
+
+sub replicate_to {
+ my ($self, %args) = @_;
+
+ my $fid = delete $args{fid}; # fid scalar to copy
+ my $on_devs = delete $args{on_devs}; # arrayref of device objects
+ my $all_devs = delete $args{all_devs}; # hashref of { devid => MogileFS::Device }
+ my $failed = delete $args{failed}; # hashref of { devid => 1 } of failed attempts this round
+
+ # this is the per-class mindevcount (the old way), which is passed in
+ # automatically from the replication worker. but if we have our own
+ # configured mindevcount in class.replpolicy, like "MultipleHosts(3)", then
+ # we use the explicit one. otherwise, if blank, or zero, like
+ # "MultipleHosts()", then we use the builtin on
+ delete $args{min}; # ignored
+
+ warn "Unknown parameters: " . join(", ", sort keys %args) if %args;
+ die "Missing parameters" unless $on_devs && $all_devs && $failed && $fid;
+
+ my $on_devid = $self->{on_devid};
+
+ my %on_dev = map { $_->id => 1 } @$on_devs;
+ if ($on_dev{$on_devid}) {
+ return (scalar(keys(%on_dev)) > 1) ? TOO_GOOD : ALL_GOOD;
+ }
+ return TEMP_NO_ANSWER if $failed->{$on_devid};
+
+ my $dev = $all_devs->{$on_devid};
+ if (!$dev) {
+ warn "devid=$on_devid does not exist for OnDevice policy\n";
+ return TEMP_NO_ANSWER;
+ }
+ return TEMP_NO_ANSWER unless $dev->should_get_replicated_files;
+ MogileFS::ReplicationRequest->new(ideal => [ $dev ], desperate => []);
+}
+
+1;
diff --git a/t/ondevice-replpol.t b/t/ondevice-replpol.t
new file mode 100644
index 0000000..c5ca60f
--- /dev/null
+++ b/t/ondevice-replpol.t
@@ -0,0 +1,100 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+use Test::More;
+use FindBin qw($Bin);
+
+use MogileFS::Server;
+use MogileFS::Util qw(error_code);
+use MogileFS::ReplicationPolicy::OnDevice;
+use MogileFS::Test;
+
+# already good.
+is(rr("dev=3 h1[d1=_ d2=_] h2[d3=X d4=_]"),
+ "all_good", "all good");
+
+# need to get it onto target device, ignore min
+is(rr("min=2 dev=3 h1[d1=X d2=_] h2[d3=_ d4=_]"),
+ "ideal(3)", "need d3");
+
+# still needs to be on host2, even though 2 copies on host1
+is(rr("dev=3 h1[d1=X d2=X] h2[d3=_ d4=_]"),
+ "ideal(3)", "need host2 dev3, even though 2 on host1");
+
+is(rr("dev=3 h1[d1=X d2=_] h2=down[d3=_ d4=_]"),
+ 'temp_fail', 'desired device unavailable');
+
+is(rr("dev=3 min=3 h1[d1=X d2=_] h2[d3=X d4=_] h3[d5=X]"),
+ "too_good", 'too good with excess copies');
+
+# be happy with one drain copy
+is(rr("min=2 dev=3 h1[d3=X,drain d5=_] h2[d4=_ d6=_]"),
+ "all_good",
+ "we are happy with copy in a drain device");
+
+# drain copy counts
+is(rr("min=2 dev=3 h1[d3=X,drain d5=_] h2[d4=X d6=_]"),
+ "too_good",
+ "the extra copy in drain leaves us too satisfied");
+
+sub rr {
+ my ($state) = @_;
+ my $ostate = $state; # original
+
+ MogileFS::Factory::Host->t_wipe;
+ MogileFS::Factory::Device->t_wipe;
+ MogileFS::Config->set_config_no_broadcast("min_free_space", 100);
+ my $hfac = MogileFS::Factory::Host->get_factory;
+ my $dfac = MogileFS::Factory::Device->get_factory;
+ my %opt; # min, dev
+
+ while ($state =~ s/^\s*\b(min|dev)=(\d+)\b//) {
+ $opt{$1} = $2;
+ }
+ my $dev = delete($opt{dev}) or die "no dev in $state";
+ my $hosts = {};
+ my $devs = {};
+ my $on_devs = [];
+
+ my $parse_error = sub {
+ die "Can't parse:\n $ostate\n"
+ };
+ while ($state =~ s/\bh(\d+)(?:=(.+?))?\[(.+?)\]//) {
+ my ($n, $opts, $devstr) = ($1, $2, $3);
+ $opts ||= "";
+ die "dup host $n" if $hosts->{$n};
+
+ my $h = $hosts->{$n} = $hfac->set({ hostid => $n,
+ status => ($opts || "alive"), observed_state => "reachable",
+ hostname => $n });
+
+ foreach my $ddecl (split(/\s+/, $devstr)) {
+ $ddecl =~ /^d(\d+)=([_X])(?:,(\w+))?$/
+ or $parse_error->();
+ my ($dn, $on_not, $status) = ($1, $2, $3);
+ die "dup device $dn" if $devs->{$dn};
+ my $d = $devs->{$dn} = $dfac->set({ devid => $dn,
+ hostid => $h->id, observed_state => "writeable",
+ status => ($status || "alive"), mb_total => 1000,
+ mb_used => 100, });
+ if ($on_not eq "X" && $d->dstate->should_have_files) {
+ push @$on_devs, $d;
+ }
+ }
+ }
+ $parse_error->() if $state =~ /\S/;
+
+ my $polclass = "MogileFS::ReplicationPolicy::OnDevice";
+ my $pol = $polclass->new($dev);
+ my $rr = $pol->replicate_to(
+ fid => 1,
+ on_devs => $on_devs,
+ all_devs => $devs,
+ failed => {},
+ %opt,
+ );
+ return $rr->t_as_string;
+}
+
+done_testing();