Message from discussion
MapReduce sharded output
Received: by 10.224.27.14 with SMTP id g14mr531771qac.1.1346891080274;
Wed, 05 Sep 2012 17:24:40 -0700 (PDT)
X-BeenThere: mongodb-user@googlegroups.com
Received: by 10.224.217.65 with SMTP id hl1ls582232qab.8.gmail; Wed, 05 Sep
2012 17:24:29 -0700 (PDT)
Received: by 10.224.42.68 with SMTP id r4mr514536qae.4.1346891069252;
Wed, 05 Sep 2012 17:24:29 -0700 (PDT)
Received: by 10.224.42.68 with SMTP id r4mr514535qae.4.1346891069234;
Wed, 05 Sep 2012 17:24:29 -0700 (PDT)
Return-Path: <cac...@gmail.com>
Received: from mail-qa0-f51.google.com (mail-qa0-f51.google.com [209.85.216.51])
by gmr-mx.google.com with ESMTPS id t29si122038qcz.1.2012.09.05.17.24.29
(version=TLSv1/SSLv3 cipher=OTHER);
Wed, 05 Sep 2012 17:24:29 -0700 (PDT)
Received-SPF: pass (google.com: domain of cac...@gmail.com designates 209.85.216.51 as permitted sender) client-ip=209.85.216.51;
Authentication-Results: gmr-mx.google.com; spf=pass (google.com: domain of cac...@gmail.com designates 209.85.216.51 as permitted sender) smtp.mail=cac...@gmail.com; dkim=pass header...@gmail.com
Received: by mail-qa0-f51.google.com with SMTP id z3so1202207qad.3
for <mongodb-user@googlegroups.com>; Wed, 05 Sep 2012 17:24:29 -0700 (PDT)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
d=gmail.com; s=20120113;
h=mime-version:in-reply-to:references:date:message-id:subject:from:to
:content-type;
bh=NSNDj4S6hL/InsqHji2idHdbx6sqXm3DAUSZ6gyI148=;
b=xlyFZgzxe5//HE0YE07JWmbnpH4c4RztjoiFidjrx2g+WgsP5TXFdr+KZR8zYu0YAG
hlYnE8Wu5m0eTmiELwoPGt9NCgK8sQcPGHpLhMoDCcCzpDtOInTtiFpJD33U0rlWLnNj
G0S1jwlh6AaFZY6/WAfTfP/aEHca55TgLpRNVDdHqOkWFyGTmYxi+HoBaZrdAnvJ+EIr
jRkDGSRftKL3GhToJGciYDU/BqWXP07Q9KbrKfB61s0TRTUE/LMAZaYtvxbL6QmmIXxC
eFUZGBSQSgjuUZaYkXNa25ycxu8ZCAlIdybw/lfdjEKxL6wROccCC3GqMKOnw/guEEIt
PdhA==
MIME-Version: 1.0
Received: by 10.224.18.209 with SMTP id x17mr1637165qaa.15.1346891068982; Wed,
05 Sep 2012 17:24:28 -0700 (PDT)
Received: by 10.49.106.66 with HTTP; Wed, 5 Sep 2012 17:24:28 -0700 (PDT)
In-Reply-To: <efe1742c-8cdc-4508-a1f0-ee85a3416ad6@googlegroups.com>
References: <6adc17b4-78f2-4db7-a37d-203ee642ae69@googlegroups.com>
<96cb7247-034a-45b6-842e-88005e6ca259@googlegroups.com>
<fa5b0f6e-e024-42fa-960f-253fafecda33@googlegroups.com>
<61fd0572-380b-445e-ac09-d763a1a5798d@googlegroups.com>
<746cae07-3d20-44e8-96e1-1cf2dbe54bb9@googlegroups.com>
<468142d6-2d5c-4764-ae6f-f0985f672a0d@googlegroups.com>
<0d0bea55-57c8-40c6-bb77-e6024985c57a@googlegroups.com>
<CAHUdCdX=zTickK5xkGVX3Lq8WGHGzuOpc_ZTabHaqtu99sa...@mail.gmail.com>
<efe1742c-8cdc-4508-a1f0-ee85a3416ad6@googlegroups.com>
Date: Wed, 5 Sep 2012 17:24:28 -0700
Message-ID: <CAHUdCdWV6oaZnFCcpSQdkiJX6tZVGsKn2B941Xu4K0n10k7...@mail.gmail.com>
Subject: Re: [mongodb-user] Re: MapReduce sharded output
From: "cac...@gmail.com" <cac...@gmail.com>
To: mongodb-user@googlegroups.com
Content-Type: multipart/alternative; boundary=bcaec51dd7c5b7c6ed04c8fd8326
--bcaec51dd7c5b7c6ed04c8fd8326
Content-Type: text/plain; charset=ISO-8859-1
So if the map reduce is running in 'reduce' mode the output collection,
which is also a data source, is hosed if any one of the primary servers
fails? ie if anyone of up to a 1000 servers fails. This would appear to
make using incremental map reduce a very chancy proposition in a sharded
environment.
Thinking about mitigations for this:
Is there anything resembling cheap copies of collections that could be used
to ensure a more consistent state (ie copy on write for portions of the
index)? Thus one could alternate between two copies of the output
collection ensuring that one is clean, or would all of the data need to be
copied each time?
Christian
On Wed, Sep 5, 2012 at 4:52 PM, William Z <william.z...@10gen.com> wrote:
>
> Hi Christian!
>
> Thanks for your great questions. Addressing them:
>
>
> 1) The map/reduce job can only run on the primary node of a replica set,
> since that is the only node that allows writes. In the current
> implementation, if a primary node is running a map/reduce job and it steps
> down for any reason, then the entire map/reduce job is aborted at that
> point, and the command returns an error condition to the caller.
>
> 2) In a map/reduce job with both sharded input and sharded output in
> MongoDB version 2.0, the final reduce is run by the 'mongos' process. If
> any input or output shard fails, then the job is aborted at that point. If
> only a portion of the documents have been processed when the output mode is
> 'merge' or 'reduce', then the documents which have been processed will
> remain, while the remaining documents will be unprocessed. No attempt is
> made to recover or retry.
>
> 3) In a map/reduce job with both sharded input and sharded output in
> MongoDB version 2.2, the final reduce is split among all of the shards in
> the cluster. If a single shard fails (that is, if the primary fails over)
> in the middle of the final reduce phase where the output mode is 'merge' or
> 'reduce' it will have the following impact:
>
> - The documents which have already been processed by that shard will
> remain; documents which have not been processed by that shard will not be
> processed
> - No other shard will take over for the work originally scheduled by the
> failing shard
> - When the secondary takes over, it will not take over for the work
> originally scheduled by the failing node
> - All other shards will run their jobs to completion as much as they are
> able
> - Recall that each shard will query every other shard for the documents
> that are targeted to eventually live on the destination shard. If other
> shards have not yet fetched some documents from the failing node, the other
> shards will not re-connect to the new primary node for the shard, and those
> documents will not be processed by the map/reduce job.
>
> 4) In short, if the primary node of a shard fails-over during a map/reduce
> job which outputs to a collection, the output collection is left in an
> unknown state. In the worst case, some subset of documents has been
> processed and some subset hasn't, and there's no good way to tell.
>
> If you're doing a map/reduce job with 'replace' output mode, then the data
> on the individual shards will either complete fully, or else fail.
> However, if one shard fails and the others succeed, the overall output
> collection will be in an inconsistent state. Fortunately, the overall
> command will not return 'success' unless all operations fully succeed on
> all shards.
>
> 6) This algorithm for map/reduce processing is new in MongoDB version
> 2.2. Unfortunately, this version is so new that there are no best
> practices yet established. I wish I had a better answer for you.
>
> If you're doing a map/reduce job with 'merge', 'replace', or 'inline'
> output mode, you can simply re-run the job, and you'll get similar output
> to what you would have gotten originally.
>
> 7) This isn't documented anywhere that I could find: I determined this by
> experiment and by examining the source code. I'll be filing a
> documentation ticket in Jira covering what I've discovered.
>
>
> Let me know if you have further questions.
>
> -William
>
>
> On Thursday, August 30, 2012 5:28:48 PM UTC-7, Christian Csar wrote:
>>
>> Thanks William, you've been extremely helpful. Is there some place I
>> should link this discussion (or a place where it's been documented that I
>> failed miserably to find)? The timing of the 2.2 release is certainly
>> convenient for me.
>>
>> While I was hoping that by the time I actually sent this email I would
>> report that everything I tried was fully working one more question has
>> arisen. For incremental map reduce what happens during a server failure
>> during the final reduce step? Will the map reduce have completed on some
>> documents in the collection but not others? Will the work continue on
>> another server in the shard so that the reduce will complete for all
>> affected documents? If the entire shard fails is the appropriate thing just
>> to restore that collection from a previous backup and do an incremental
>> reduce from there? If there are issues is there them, or a set of best
>> practices to mitigate them?
>>
>> I tried thinking of systems using versioning tags to mitigate the issues
>> but did not come up with one.
>>
>> Christian
>>
> --
> You received this message because you are subscribed to the Google
> Groups "mongodb-user" group.
> To post to this group, send email to mongodb-user@googlegroups.com
> To unsubscribe from this group, send email to
> mongodb-user+unsubscribe@googlegroups.com
> See also the IRC channel -- freenode.net#mongodb
>
--bcaec51dd7c5b7c6ed04c8fd8326
Content-Type: text/html; charset=ISO-8859-1
Content-Transfer-Encoding: quoted-printable
So if the map reduce is running in 'reduce' mode the output collect=
ion, which is also a data source, is hosed if any one of the primary server=
s fails? ie if anyone of up to a 1000 servers fails. This would appear to m=
ake using incremental map reduce a very chancy proposition in a sharded env=
ironment.<br>
<br>Thinking about mitigations for this:<br>Is there anything resembling ch=
eap copies of collections that could be used to ensure a more consistent st=
ate (ie copy on write for portions of the index)? Thus one could alternate =
between two copies of the output collection ensuring that one is clean, or =
would all of the data need to be copied each time?<br>
<br>Christian<br><br><div class=3D"gmail_quote">On Wed, Sep 5, 2012 at 4:52=
PM, William Z <span dir=3D"ltr"><<a href=3D"mailto:william.z...@10gen.c=
om" target=3D"_blank">william.z...@10gen.com</a>></span> wrote:<br><bloc=
kquote class=3D"gmail_quote" style=3D"margin:0 0 0 .8ex;border-left:1px #cc=
c solid;padding-left:1ex">
<br>Hi Christian!<br><br>Thanks for your great questions.=A0 Addressing the=
m:<br><br><br>1) The map/reduce job can only run on the primary node of a r=
eplica set, since that is the only node that allows writes.=A0 In the curre=
nt implementation, if a primary node is running a map/reduce job and it ste=
ps down for any reason, then the entire map/reduce job is aborted at that p=
oint, and the command returns an error condition to the caller.<br>
<br>2) In a map/reduce job with both sharded input and sharded output in Mo=
ngoDB version 2.0, the final reduce is run by the 'mongos' process.=
=A0 If any input or output shard fails, then the job is aborted at that poi=
nt.=A0 If only a portion of the documents have been processed when the outp=
ut mode is 'merge' or 'reduce', then the documents which ha=
ve been processed will remain, while the remaining documents will be unproc=
essed.=A0 No attempt is made to recover or retry.<br>
<br>3) In a map/reduce job with both sharded input and sharded output in Mo=
ngoDB version 2.2, the final reduce is split among all of the shards in the=
cluster.=A0 If a single shard fails (that is, if the primary fails over) i=
n the middle of the final reduce phase where the output mode is 'merge&=
#39; or 'reduce' it will have the following impact:<br>
<br>=A0 - The documents which have already been processed by that shard wil=
l remain; documents which have not been processed by that shard will not be=
processed<br>=A0 - No other shard will take over for the work originally s=
cheduled by the failing shard<br>
=A0 - When the secondary takes over, it will not take over for the work ori=
ginally scheduled by the failing node<br>=A0 - All other shards will run th=
eir jobs to completion as much as they are able<br>=A0 - Recall that each s=
hard will query every other shard for the documents that are targeted to ev=
entually live on the destination shard.=A0 If other shards have not yet fet=
ched some documents from the failing node, the other shards will not re-con=
nect to the new primary node for the shard, and those documents will not be=
processed by the map/reduce job.=A0 <br>
<br>4) In short, if the primary node of a shard fails-over during a map/red=
uce job which outputs to a collection, the output collection is left in an =
unknown state.=A0 In the worst case, some subset of documents has been proc=
essed and some subset hasn't, and there's no good way to tell.<br>
<br>If you're doing a map/reduce job with 'replace' output mode=
, then the data on the individual shards will either complete fully, or els=
e fail.=A0 However, if one shard fails and the others succeed, the overall =
output collection will be in an inconsistent state.=A0 Fortunately, the ove=
rall command will not return 'success' unless all operations fully =
succeed on all shards.=A0 <br>
<br>6) This algorithm for map/reduce processing is new in MongoDB version 2=
.2.=A0 Unfortunately, this version is so new that there are no best practic=
es yet established.=A0 I wish I had a better answer for you.<br><br>If you&=
#39;re doing a map/reduce job with 'merge', 'replace', or &=
#39;inline' output mode, you can simply re-run the job, and you'll =
get similar output to what you would have gotten originally.<br>
<br>7) This isn't documented anywhere that I could find: I determined t=
his by experiment and by examining the source code.=A0 I'll be filing a=
documentation ticket in Jira covering what I've discovered.<div class=
=3D"im HOEnZb">
<br><br>Let me know if you have further questions.<br><br>=A0-William<br><b=
r><br></div><div class=3D"HOEnZb"><div class=3D"h5">On Thursday, August 30,=
2012 5:28:48 PM UTC-7, Christian Csar wrote:<blockquote class=3D"gmail_quo=
te" style=3D"margin:0;margin-left:0.8ex;border-left:1px #ccc solid;padding-=
left:1ex">
Thanks William, you've been extremely helpful. Is there some place I sh=
ould link this discussion (or a place where it's been documented that I=
failed miserably to find)? The timing of the 2.2 release is certainly conv=
enient for me.<br>
<br>
While I was hoping that by the time I actually sent this email I would repo=
rt that everything I tried was fully working one more question has arisen. =
For incremental map reduce what happens during a server failure during the =
final reduce step? Will the map reduce have completed on some documents in =
the collection but not others? Will the work continue on another server in =
the shard so that the reduce will complete for all affected documents? If t=
he entire shard fails is the appropriate thing just to restore that collect=
ion from a previous backup=A0 and do an incremental reduce from there? If t=
here are issues is there them, or a set of best practices to mitigate them?=
<br>
<br>I tried thinking of systems using versioning tags to mitigate the issue=
s but did not come up with one.<br><br>Christian<br>
</blockquote>
<p></p></div></div><div class=3D"HOEnZb"><div class=3D"h5">
-- <br>
You received this message because you are subscribed to the Google<br>
Groups "mongodb-user" group.<br>
To post to this group, send email to <a href=3D"mailto:mongodb-user@googleg=
roups.com" target=3D"_blank">mongodb-user@googlegroups.com</a><br>
To unsubscribe from this group, send email to<br>
<a href=3D"mailto:mongodb-user%2Bunsubscribe@googlegroups.com" target=3D"_b=
lank">mongodb-user+unsubscribe@googlegroups.com</a><br>
See also the IRC channel -- <a href=3D"http://freenode.net#mongodb" target=
=3D"_blank">freenode.net#mongodb</a><br>
</div></div></blockquote></div><br>
--bcaec51dd7c5b7c6ed04c8fd8326--