Following one of my blog post[1] about Puppetd memory consumption, I decided to also try to add my stone to the edifice.
Since #2892 is almost fixed by now, I decided to spend my time on something else: 1) json catalog (and its deserialization) 2) sourced file content
Those two things can consume memory for a short time (memory which unfortunately is always consumed, see my blog post for some explanations).
So this patch brings to puppet:
1) support for a new Json parser: Yajl-ruby[1], which is faster and uses less memory than the regular json parser. Yajl-ruby is available as a gem. To activate the support it is needed to use --preferred_serialization_format=yajl
2) use Yajl-ruby in "streaming" mode: puppetd never reads the whole response body in ram, but instead feeds Yajl-ruby chunk by chunk. This way, the json serialized catalog is never fully stored in RAM.
3) allow the file resource source system to stream file content to disk instead of reading the whole response body in RAM and then dumping it on file.
This patch has been lightly tested and seems to work, YMMV, though. I also tried for the patch to be as minimal as possible.
I took some figures while puppetd was sourcing a 100MiB file:
Now to be clear: this patch is a quick and dirty hack, I don't expect it to be merged as is, so I'm waiting reviews, tests, comments, flames. I even think this patch brings a lot whole new serie of issues and bugs :-)
Note: this patch introduces a new issue, regarding #2892, since we don't have anymore the whole serialized catalog in memory, it will be hard dump it as is to disk for the local cache. Something clever will need to be done (like streaming the cache...).
Brice Figureau (5): Add stream format capability Add Yajl-ruby json parser format Add a way to capture checksums on a "stream" Let the rest client know how to handle stream response Allows puppet:// content client streaming
diff --git a/lib/puppet/util/checksum_stream.rb b/lib/puppet/util/checksum_stream.rb new file mode 100644 index 0000000..0add5fd --- /dev/null +++ b/lib/puppet/util/checksum_stream.rb @@ -0,0 +1,20 @@ + +class Puppet::Util::ChecksumStream + attr_accessor :digest + + def initialize(digest) + @digest = digest.reset + end + + def update(chunk) + digest << chunk + end + + def checksum + digest.hexdigest + end + + def to_s + "checksum: #{digest}" + end +end \ No newline at end of file diff --git a/lib/puppet/util/checksums.rb b/lib/puppet/util/checksums.rb index 98bf5de..1b73a86 100644 --- a/lib/puppet/util/checksums.rb +++ b/lib/puppet/util/checksums.rb @@ -1,3 +1,5 @@ +require 'puppet/util/checksum_stream' + # A stand-alone module for calculating checksums # in a generic way. module Puppet::Util::Checksums @@ -34,6 +36,11 @@ module Puppet::Util::Checksums md5_file(filename, true) end
+ def md5_stream + require 'digest/md5' + Puppet::Util::ChecksumStream.new(Digest::MD5.new()) + end + # Return the :mtime timestamp of a file. def mtime_file(filename) File.stat(filename).send(:mtime) @@ -63,6 +70,11 @@ module Puppet::Util::Checksums sha1_file(filename, true) end
+ def sha1_stream + require 'digest/sha1' + Puppet::Util::ChecksumStream.new(Digest::SHA1.new()) + end + # Return the :ctime of a file. def ctime_file(filename) File.stat(filename).send(:ctime) diff --git a/spec/unit/util/checksum_stream.rb b/spec/unit/util/checksum_stream.rb new file mode 100644 index 0000000..0ace637 --- /dev/null +++ b/spec/unit/util/checksum_stream.rb @@ -0,0 +1,23 @@ + +require File.dirname(__FILE__) + '/../../spec_helper' + +require 'puppet/util/checksum_stream' + +describe Puppet::Util::ChecksumStream do + + before(:each) do + @digest = stub 'digest' + @digest.stubs(:reset).returns(@digest) + @sum = Puppet::Util::ChecksumStream.new(@digest) + end + + it "should add to the digest when update" do + @digest.expects(:<<).with("content") + @sum.update("content") + end + + it "should produce the final checksum" do + @digest.expects(:hexdigest).returns("DEADBEEF") + @sum.checksum.should == "DEADBEEF" + end +end \ No newline at end of file diff --git a/spec/unit/util/checksums.rb b/spec/unit/util/checksums.rb index d31d7a0..a27229e 100755 --- a/spec/unit/util/checksums.rb +++ b/spec/unit/util/checksums.rb @@ -15,6 +15,7 @@ describe Puppet::Util::Checksums do
content_sums.each do |sumtype| it "should be able to calculate %s sums from strings" % sumtype do @@ -28,6 +29,12 @@ describe Puppet::Util::Checksums do end end
+ stream.each do |sumtype| + it "should be able to calculate %s sums from streams" % sumtype do + @summer.should be_respond_to(sumtype.to_s) + end + end + it "should have a method for stripping a sum type from an existing checksum" do @summer.sumtype("{md5}asdfasdfa").should == "md5" end @@ -62,6 +69,11 @@ describe Puppet::Util::Checksums do
@summer.send(sum.to_s + "_file", file).should == :mydigest end + + it "should use #{klass} to seed checksum stream" do + @summer.send("#{sum}_stream").should be_instance_of(Puppet::Util::ChecksumStream) + @summer.send("#{sum}_stream").digest.should be_instance_of(klass) + end end end
# Access objects via REST class Puppet::Indirector::REST < Puppet::Indirector::Terminus @@ -44,10 +45,11 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus content_type = response['content-type'].gsub(/\s*;.*$/,'') # strip any appended charset
# Convert the response to a deserialized object. + response = Puppet::Network::ResponseStream.new(response) if multiple - model.convert_from_multiple(content_type, response.body) + model.convert_from_multiple(content_type, response) else - model.convert_from(content_type, response.body) + model.convert_from(content_type, response) end else # Raise the http error if we didn't get a 'success' of some kind. @@ -66,14 +68,16 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus end
def find(request) - return nil unless result = deserialize(network(request).get(indirection2uri(request), headers)) + return nil unless result = network(request).request_get(indirection2uri(request), headers) do |response| + return deserialize(response) + end result.name = request.key result end
def search(request) - unless result = deserialize(network(request).get(indirection2uri(request), headers), true) - return [] + return [] unless result = network(request).request_get(indirection2uri(request), headers) do |response| + return deserialize(response, true) end return result end diff --git a/lib/puppet/network/response_stream.rb b/lib/puppet/network/response_stream.rb new file mode 100644 index 0000000..4b7c772 --- /dev/null +++ b/lib/puppet/network/response_stream.rb @@ -0,0 +1,45 @@ + +require 'net/http' + +# This is a wrapper around either a Net::HTTPResponse or a String +# allowing the same interface for the exterior world. +class Puppet::Network::ResponseStream + + attr_accessor :response + + [:code, :body].each do |m| + define_method(m) do + response.send(:m) + end + end + + def initialize(content) + @response = content + end + + def stream? + response.is_a?(Net::HTTPResponse) + end + + def content + response.body + end + + def length + if stream? + response.content_length + else + response.length + end + end + + def stream + if stream? + response.read_body do |r| + yield r + end + else + yield response.body + end + end +end \ No newline at end of file diff --git a/spec/unit/indirector/rest.rb b/spec/unit/indirector/rest.rb index d12e3c6..fb6c03c 100755 --- a/spec/unit/indirector/rest.rb +++ b/spec/unit/indirector/rest.rb @@ -123,35 +123,42 @@ describe Puppet::Indirector::REST do }
it "should return the results of converting from the format specified by the content-type header if the response code is in the 200s" do - @model.expects(:convert_from).with("myformat", "mydata").returns "myobject" - response = mock 'response' response.stubs(:[]).with("content-type").returns "myformat" response.stubs(:body).returns "mydata" response.stubs(:code).returns "200" + stream = stub 'stream' + Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream ) + + @model.expects(:convert_from).with("myformat", stream).returns "myobject"
@searcher.deserialize(response).should == "myobject" end
it "should convert and return multiple instances if the return code is in the 200s and 'multiple' is specified" do - @model.expects(:convert_from_multiple).with("myformat", "mydata").returns "myobjects" - response = mock 'response' response.stubs(:[]).with("content-type").returns "myformat" response.stubs(:body).returns "mydata" response.stubs(:code).returns "200" + stream = stub 'stream' + Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream ) + + @model.expects(:convert_from_multiple).with("myformat", stream).returns "myobjects"
@searcher.deserialize(response, true).should == "myobjects" end
it "should strip the content-type header to keep only the mime-type" do - @model.expects(:convert_from).with("text/plain", "mydata").returns "myobject" - response = mock 'response' response.stubs(:[]).with("content-type").returns "text/plain; charset=utf-8" response.stubs(:body).returns "mydata" response.stubs(:code).returns "200"
+ stream = stub 'stream' + Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream ) + + @model.expects(:convert_from).with("text/plain", stream).returns "myobject" + @searcher.deserialize(response) end end @@ -186,7 +193,8 @@ describe Puppet::Indirector::REST do
describe "when doing a find" do before :each do - @connection = stub('mock http connection', :get => @response) + @connection = stub('mock http connection') + @connection.stubs(:request_get).yields(@response) @searcher.stubs(:network).returns(@connection) # neuter the network connection
# Use a key with spaces, so we can test escaping @@ -195,27 +203,27 @@ describe Puppet::Indirector::REST do
it "should call the GET http method on a network connection" do @searcher.expects(:network).returns @connection - @connection.expects(:get).returns @response + @connection.expects(:request_get).yields @response @searcher.find(@request) end
it "should deserialize and return the http response" do - @connection.expects(:get).returns @response
it "should use the URI generated by the Handler module" do @searcher.expects(:indirection2uri).with(@request).returns "/my/uri" - @connection.expects(:get).with { |path, args| path == "/my/uri" }.returns(@response) + @connection.expects(:request_get).with { |path, args| path == "/my/uri" }.yields(@response) @searcher.find(@request) end
it "should provide an Accept header containing the list of supported formats joined with commas" do - @connection.expects(:get).with { |path, args| args["Accept"] == "supported, formats" }.returns(@response) + @connection.expects(:request_get).with { |path, args| args["Accept"] == "supported, formats" }.yields(@response)
@searcher.model.expects(:supported_formats).returns %w{supported formats} @searcher.find(@request) @@ -227,7 +235,7 @@ describe Puppet::Indirector::REST do end
it "should set the name of the resulting instance to the asked-for name" do - @searcher.expects(:deserialize).with(@response).returns @instance + @connection.expects(:request_get).returns @instance @instance.expects(:name=).with "foo bar" @searcher.find(@request) end @@ -240,7 +248,8 @@ describe Puppet::Indirector::REST do
describe "when doing a search" do before :each do - @connection = stub('mock http connection', :get => @response) + @connection = stub('mock http connection') + @connection.stubs(:request_get).yields(@response) @searcher.stubs(:network).returns(@connection) # neuter the network connection
@model.stubs(:convert_from_multiple) @@ -250,12 +259,12 @@ describe Puppet::Indirector::REST do
it "should call the GET http method on a network connection" do @searcher.expects(:network).returns @connection - @connection.expects(:get).returns @response + @connection.expects(:request_get).yields @response @searcher.search(@request) end
it "should deserialize as multiple instances and return the http response" do - @connection.expects(:get).returns @response + @connection.expects(:request_get).yields(@response).returns "myobject" @searcher.expects(:deserialize).with(@response, true).returns "myobject"
@searcher.search(@request).should == 'myobject' @@ -263,19 +272,19 @@ describe Puppet::Indirector::REST do
it "should use the URI generated by the Handler module" do
...
diff --git a/lib/puppet/network/format.rb b/lib/puppet/network/format.rb index d781242..b568aa7 100644 --- a/lib/puppet/network/format.rb +++ b/lib/puppet/network/format.rb @@ -89,6 +89,10 @@ class Puppet::Network::Format suitable? and required_methods_present?(klass) end
+ def support_stream? + false + end + def to_s "Puppet::Network::Format[%s]" % name end diff --git a/lib/puppet/network/format_handler.rb b/lib/puppet/network/format_handler.rb index ea8cf35..1435b99 100644 --- a/lib/puppet/network/format_handler.rb +++ b/lib/puppet/network/format_handler.rb @@ -23,7 +23,7 @@ module Puppet::Network::FormatHandler @format = format end
- [:intern, :intern_multiple, :render, :render_multiple, :mime].each do |method| + [:intern, :intern_multiple, :render, :render_multiple, :mime, :support_stream?].each do |method| define_method(method) do |*args| protect(method, args) end @@ -93,12 +93,21 @@ module Puppet::Network::FormatHandler Puppet::Network::FormatHandler end
+ def decapsulate(format, data) + format = format_handler.protected_format(format) + + data = data.content if !format.support_stream? and data.respond_to?(:stream?) and data.stream? + [format, data] + end + def convert_from(format, data) - format_handler.protected_format(format).intern(self, data) + format, data = decapsulate(format, data) + format.intern(self, data) end
def convert_from_multiple(format, data) - format_handler.protected_format(format).intern_multiple(self, data) + format, data = decapsulate(format, data) + format.intern_multiple(self, data) end
+ def support_stream?(name) + Puppet::Network::FormatHandler.format(name).support_stream? + end + def supported_formats result = format_handler.formats.collect { |f| format_handler.format(f) }.find_all { |f| f.supported?(self) }.collect { |f| f.name }.sort do |a, b| # It's an inverse sort -- higher weight formats go first. @@ -164,6 +177,10 @@ module Puppet::Network::FormatHandler def support_format?(name) self.class.support_format?(name) end + + def support_stream?(name) + self.class.support_stream?(name) + end end end
diff --git a/spec/unit/network/format.rb b/spec/unit/network/format.rb index e5a6b59..feefd68 100755 --- a/spec/unit/network/format.rb +++ b/spec/unit/network/format.rb @@ -66,6 +66,10 @@ describe Puppet::Network::Format do @format.should respond_to(:supported?) end
+ it "should be able to tell it supports streaming" do + @format.should respond_to(:support_stream?) + end + it "should consider a class to be supported if it has the individual and multiple methods for rendering and interning" do @format.should be_supported(FormatRenderer) end diff --git a/spec/unit/network/format_handler.rb b/spec/unit/network/format_handler.rb index 110effe..a5a68ea 100755 --- a/spec/unit/network/format_handler.rb +++ b/spec/unit/network/format_handler.rb @@ -117,6 +117,7 @@ describe Puppet::Network::FormatHandler do @format = mock 'format' @format.stubs(:supported?).returns true @format.stubs(:name).returns :my_format + @format.stubs(:support_stream?).returns false Puppet::Network::FormatHandler.stubs(:format).with(:my_format).returns @format Puppet::Network::FormatHandler.stubs(:mime).with("text/myformat").returns @format Puppet::Network::Format.stubs(:===).returns false @@ -132,6 +133,15 @@ describe Puppet::Network::FormatHandler do FormatTester.support_format?(:my_format) end
+ it "should be able to test whether a format supports streaming" do + FormatTester.should respond_to(:support_stream?) + end + + it "should use the Format to determine whether a given format supports streaming" do + @format.expects(:support_stream?) + FormatTester.support_stream?(:my_format) + end + it "should be able to convert from a given format" do FormatTester.should respond_to(:convert_from) end @@ -141,6 +151,29 @@ describe Puppet::Network::FormatHandler do FormatTester.convert_from(:my_format, "mydata") end
+ it "should not decapsulate the content when converting with a format that supports stream" do + data = stub_everything 'data' + data.expects(:content).never + @format.expects(:support_stream?).returns(true) + @format.expects(:intern).with(FormatTester, data) + FormatTester.convert_from(:my_format, data) + end + + it "should not decapsulate the content when converting streamable data" do + data = stub_everything 'data', :stream? => true + data.expects(:content).never + @format.expects(:support_stream?).returns(true) + @format.expects(:intern).with(FormatTester, data) + FormatTester.convert_from(:my_format, data) + end + + it "should decapsulate the content when converting streamable data with a non streamable format" do + data = stub_everything 'data', :stream? => true + data.expects(:content).returns("mydata") + @format.expects(:intern).with(FormatTester, "mydata") + FormatTester.convert_from(:my_format, data) + end + it "should call the format-specific converter when asked to convert from a given format by mime-type" do @format.expects(:intern).with(FormatTester, "mydata") FormatTester.convert_from("text/myformat", "mydata") -- 1.6.6.1
This specific format can unformat/format json in a streaming way. To activate it: --preferred_serialization_format=yajl
Apparently pson was serializing ruby objects by calling to_s. Yajl puts its public properties in a hash, since ResourceReference don't have a specific to_pson_data_hash we force them to serialize as string, thus enabling the same result as with pson.
diff --git a/lib/puppet/feature/yajl.rb b/lib/puppet/feature/yajl.rb new file mode 100644 index 0000000..b4d4954 --- /dev/null +++ b/lib/puppet/feature/yajl.rb @@ -0,0 +1,24 @@ +require 'puppet/util/feature' + +# We want this to load if possible, but it's not automatically +# required. +Puppet.features.rubygems? +Puppet.features.add(:yajl) do + found = false + begin + require 'rubygems' + require 'yajl' + + #Yajl::Encoder.enable_json_gem_compatability + + class ::Object + def to_pson(*args, &block) + "\"#{to_s}\"" + end + end + + found = true + rescue LoadError => detail + end + found +end diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb index a98dcbc..8b9b68d 100644 --- a/lib/puppet/network/formats.rb +++ b/lib/puppet/network/formats.rb @@ -186,3 +186,111 @@ Puppet::Network::FormatHandler.create(:pson, :mime => "text/pson", :weight => 10 klass.from_pson(data) end end + +Puppet::Network::FormatHandler.create(:yajl, :mime => "text/yajl", :weight => 20) do + confine :true => Puppet.features.yajl? + + class Puppet::ParsingComplete + attr_accessor :result + + def initialize + @result = [] + end + + def complete(object) + @result << object + end + end + + module ::PSON + alias :old_parse :parse + def parse(string) + Yajl::Parser.parse(string) + end + end + + def parse(content) + if content.respond_to?(:stream?) + unless content.stream? + Yajl::Parser.parse(content.content) + else + complete = Puppet::ParsingComplete.new + parser = Yajl::Parser.new + parser.on_parse_complete = complete.method(:complete) + content.stream do |r| + parser << r + end + result = complete.result + return result.shift if result.size == 1 + result + end + else + Yajl::Parser.parse(content) + end + end + + def intern(klass, content) + data_to_instance(klass, parse(content)) + end + + def intern_multiple(klass, content) + parse(content).collect do |data| + data_to_instance(klass, data) + end + end + + def render(instance) + Yajl::Encoder.encode(instance_to_data(instance)) + end + + def render_multiple(instances) + out = "" + encoder = Yajl::Encoder.new + instances.collect do |i| + out << encoder.encode(instance_to_data(i)) + end + out + end + + # supported only for brave souls installing yajl + def supported?(klass) + Puppet.features.yajl? + end + + def support_stream? + # of course we do + true + end + + # If they pass class information, we want to ignore it. By default, + # we'll include class information but we won't rely on it - we don't + # want class names to be required because we then can't change our + # internal class names, which is bad. + def data_to_instance(klass, data) + if data.is_a?(Hash) and d = data['data'] + data = d + end + if data.is_a?(klass) + return data + end + klass.from_pson(data) + end + + # recursively call to_pson_data_hash on objects + # supporting it + def instance_to_data(instance) + instance = instance.to_pson_data_hash if instance.respond_to?(:to_pson_data_hash) + case instance + when Hash + instance = instance.inject({}) do |h, (k,v)| + h[k] = instance_to_data(v) + h + end + when Array + instance.collect! do |i| + instance_to_data(i) + end + end + instance + end +end diff --git a/lib/puppet/resource.rb b/lib/puppet/resource.rb index 91dd547..2bf99c0 100644 --- a/lib/puppet/resource.rb +++ b/lib/puppet/resource.rb @@ -53,7 +53,7 @@ class Puppet::Resource
# Don't duplicate the title as the namevar next hash if param == namevar and value == title - hash[param] = value + hash[param] = value.is_a?(Puppet::Resource::Reference) ? value.to_s : value hash end
diff --git a/spec/integration/network/formats.rb b/spec/integration/network/formats.rb index 35e7977..6eef347 100755 --- a/spec/integration/network/formats.rb +++ b/spec/integration/network/formats.rb @@ -18,11 +18,15 @@ class PsonIntTest @string = string end
def self.canonical_order(s) @@ -108,3 +112,77 @@ describe Puppet::Network::FormatHandler.format(:pson) do end end end + +describe Puppet::Network::FormatHandler.format(:yajl) do + describe "when yajl is absent" do + confine "'yajl' library is present" => (! Puppet.features.yajl?) + + before do + @yajl = Puppet::Network::FormatHandler.format(:yajl) + end + + it "should not be suitable" do + @yajl.should_not be_suitable + end + + it "should not be supported" do + @yajl.should_not be_supported + end + end + + describe "when yajl is available" do + confine "Missing 'yajl' library" => Puppet.features.yajl? + + before do + @yajl = Puppet::Network::FormatHandler.format(:yajl) + end + + it "should be able to render an instance to json" do + instance = PsonIntTest.new("foo") + PsonIntTest.canonical_order(@yajl.render(instance)).should == PsonIntTest.canonical_order('{"type":"PsonIntTest","data":["foo"]}' ) + end + + it "should be able to render arrays to json" do + @yajl.render([1,2]).should == '[1,2]' + end + + it "should be able to render arrays containing hashes to json" do + @yajl.render([{"one"=>1},{"two"=>2}]).should == '[{"one":1},{"two":2}]' + end + + it "should be able to render multiple instances to json" do + Puppet.features.add(:yajl, :libs => %w{yajl}) + + one = PsonIntTest.new("one") + two = PsonIntTest.new("two") + + PsonIntTest.canonical_order(@yajl.render([one,two])).should == PsonIntTest.canonical_order('[{"type":"PsonIntTest","data":["one"]},{"type" :"PsonIntTest","data":["two"]}]') + end + + it "should be able to intern a stream" do + content = stub 'stream', :stream? => true + content.expects(:stream).multiple_yields('{"type":"PsonIntTest",', '"data":["foo"]}') + @yajl.intern(PsonIntTest, content).should == PsonIntTest.new("foo") + end + + it "should be able to intern json into an instance" do + @yajl.intern(PsonIntTest, '{"type":"PsonIntTest","data":["foo"]}').should == PsonIntTest.new("foo") + end + + it "should be able to intern json with no class information into an instance" do + @yajl.intern(PsonIntTest, '["foo"]').should == PsonIntTest.new("foo") + end + + it "should be able to intern multiple instances from json" do + @yajl.intern_multiple(PsonIntTest, '[{"type": "PsonIntTest", "data": ["one"]},{"type": "PsonIntTest", "data": ["two"]}]').should == [ + PsonIntTest.new("one"), PsonIntTest.new("two") + ] + end + + it "should be able to intern multiple instances from json with no class information" do + @yajl.intern_multiple(PsonIntTest, '[["one"],["two"]]').should == [ + PsonIntTest.new("one"), PsonIntTest.new("two") + ] + end + end +end diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb index a241306..208c6f4 100755 --- a/spec/unit/network/formats.rb +++ b/spec/unit/network/formats.rb @@ -18,11 +18,15 @@ class PsonTest @string = string end
- def to_pson(*args) + def to_pson_data_hash { 'type' => self.class.name, 'data' => @string - }.to_pson(*args) + } + end + + def to_pson(*args) + to_pson_data_hash.to_pson(*args) end end
@@ -362,4 +366,102 @@ describe "Puppet Network Format" do end end end + + + it "should include a yajl format" do + Puppet::Network::FormatHandler.format(:yajl).should_not be_nil + end + + describe "yajl" do + confine "Missing 'yajl' library" => Puppet.features.yajl? + + before do + @yajl = Puppet::Network::FormatHandler.format(:yajl) + end + + it "should have its mime type set to text/yajl" do + Puppet::Network::FormatHandler.format(:yajl).mime.should == "text/yajl" + end
...
When a file source property requests a file content, the request is routed (indirected) to P::I::FileContent::Rest. This class instead of relying on the general Rest subsystem to get a raw response containing the full file content, creates a special model using a DeferredResponse. Later when the file type wants to write the file on disk, it "streams" it through the DeferredResponse. The DeferredResponse lazily executes the network requests. But due to the poor Net::HTTP implementation, we're forced to use a new Thread to do the streaming. Each read chunk is passed in a Thread SizedQueue which is then fetch by the main tread doing the on disk store. The new checksum is computed at the same time chunk by chunk.
diff --git a/lib/puppet/file_serving/content_stream.rb b/lib/puppet/file_serving/content_stream.rb new file mode 100644 index 0000000..2f0b1a7 --- /dev/null +++ b/lib/puppet/file_serving/content_stream.rb @@ -0,0 +1,23 @@ +# +# Created by Luke Kanies on 2007-10-16. +# Copyright (c) 2007. All rights reserved. + +require 'puppet/indirector' +require 'puppet/file_serving' +require 'puppet/file_serving/content' + +# A class that handles retrieving file contents. +# It only reads the file when its content is specifically +# asked for. +class Puppet::FileServing::ContentStream < Puppet::FileServing::Content + + def self.create(content) + instance = new("/this/is/a/fake/path") + instance.content = content + instance + end + + def self.from_raw(content) + content + end +end diff --git a/lib/puppet/indirector/file_content/rest.rb b/lib/puppet/indirector/file_content/rest.rb index 7b3cade..6da9103 100644 --- a/lib/puppet/indirector/file_content/rest.rb +++ b/lib/puppet/indirector/file_content/rest.rb @@ -2,10 +2,18 @@ # Created by Luke Kanies on 2007-10-18. # Copyright (c) 2007. All rights reserved.
class Puppet::Indirector::FileContent::Rest < Puppet::Indirector::REST desc "Retrieve file contents via a REST HTTP interface." + + # let's cheat a little bit: + # instead of returning a "model" we're returning something pretending to be a model + # which instead will fire a deferred request when needed + def find(request) + return Puppet::FileServing::ContentStream.create(Puppet::Network::DeferredResponse .new(request, indirection2uri(request), headers, self)) + end end diff --git a/lib/puppet/network/deferred_response.rb b/lib/puppet/network/deferred_response.rb new file mode 100644 index 0000000..9175e72 --- /dev/null +++ b/lib/puppet/network/deferred_response.rb @@ -0,0 +1,94 @@ +require 'net/http' +require 'thread' + +class Puppet::Network::DeferredResponse + + attr_accessor :request, :uri, :headers, :rest + attr_accessor :response + attr_accessor :request_started + + def initialize(request, uri, headers, rest) + @request = request + @uri = uri + @headers = headers + @rest = rest + @mutex = Mutex.new + @chunk_queue = SizedQueue.new(1) + @got_response = ConditionVariable.new + @chunk = nil + @request_started = false + end + + def stream? + true + end + + def length + start_request + response.content_length + end + + def start_request + # bail out early if we already started the request + return if request_started? + + # launch the network request + Thread.new do + rest.network(request).request_get(uri, headers) do |response| + # we got a response from server + @mutex.synchronize do + @request_started = true + @response = response + @got_response.signal + end + + unless content = rest.deserialize(response) + fail "Could not find any content at %s" % request + end + + stream_response(content.content) + end + end + + # wait for the start of the response to be available + @mutex.synchronize do + @got_response.wait(@mutex) unless @request_started + end + end + + def stream + start_request + while true do + unless chunk = @chunk_queue.pop + # it's the end + break + end + + @checksum.update(chunk) if @checksum + + yield chunk + end + return @checksum + end + + # if we want the stream to be checksummed on the fly + # either set a Digest or a Puppet::Util::ChecksumStream + def checksum=(checksum) + @checksum = checksum + end + + def request_started? + @mutex.synchronize do + return @request_started + end + end + + def stream_response(stream) + stream.response.read_body do |chunk| + # send chunks to our consumer + @chunk_queue << chunk + end + # it's over guys + @chunk_queue << nil + end +end \ No newline at end of file diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb index 8b9b68d..19053ec 100644 --- a/lib/puppet/network/formats.rb +++ b/lib/puppet/network/formats.rb @@ -143,6 +143,10 @@ Puppet::Network::FormatHandler.create(:raw, :mime => "application/x-raw", :weigh raise NotImplementedError end
+ def support_stream? + true + end + # LAK:NOTE The format system isn't currently flexible enough to handle # what I need to support raw formats just for individual instances (rather # than both individual and collections), but we don't yet have enough data diff --git a/lib/puppet/type/file.rb b/lib/puppet/type/file.rb index 2f5b5df..3d3b8b6 100644 --- a/lib/puppet/type/file.rb +++ b/lib/puppet/type/file.rb @@ -710,7 +710,11 @@ module Puppet if validate = validate_checksum? # Use the appropriate checksum type -- md5, md5lite, etc. sumtype = property(:checksum).checktype - checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content) + if content.is_a?(String) + checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content) + else + content.checksum = property(:checksum).send("#{sumtype}_stream") + end end
Puppet::Util.withumask(umask) do - File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) { |f| f.print content } + File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) do |f| + if content.is_a?(String) + f.print content + else # we're in front of a stream + newchecksum = content.stream do |c| + f.print c + end + if validate and newchecksum + checksum ||= "{#{sumtype}}" + newchecksum.checksum + end + end + end end
# And put our new file in place diff --git a/spec/integration/network/deferred_response.rb b/spec/integration/network/deferred_response.rb new file mode 100644 index 0000000..01cb6ba --- /dev/null +++ b/spec/integration/network/deferred_response.rb @@ -0,0 +1,34 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../spec_helper' + +require 'puppet/network/deferred_response' + +describe Puppet::Network::DeferredResponse do + before(:each) do + @request = stub_everything 'request' + @uri = stub_everything 'uri' + @headers = stub_everything 'headers' + + @request = stub_everything 'request' + @http = stub_everything 'http' + @rest = stub_everything 'rest', :network => @http + + @response = stub_everything 'response' + @http.stubs(:request_get).yields(@response) + + @stream = stub_everything 'stream', :response => @response + @response.stubs(:read_body).multiple_yields("chunk1", "chunk2") + @content = stub_everything 'content', :content => @stream + + @rest.stubs(:deserialize).returns(@content) + + @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest) + end + + it "should pass chunks to the main thread" do + @deferred.stream do |c| + c.should match /^chunk\d/ + end + end +end \ No newline at end of file diff --git
...
> Following one of my blog post[1] about Puppetd memory consumption, I > decided to also try to add my stone to the edifice.
> Since #2892 is almost fixed by now, I decided to spend my time on > something else: > 1) json catalog (and its deserialization) > 2) sourced file content
> Those two things can consume memory for a short time (memory which > unfortunately > is always consumed, see my blog post for some explanations).
> So this patch brings to puppet:
> 1) support for a new Json parser: Yajl-ruby[1], which is faster and uses > less memory than the regular json parser. Yajl-ruby is available as a gem. > To activate the support it is needed to use > --preferred_serialization_format=yajl
> 2) use Yajl-ruby in "streaming" mode: puppetd never reads the whole > response > body in ram, but instead feeds Yajl-ruby chunk by chunk. This way, the json > serialized catalog is never fully stored in RAM.
> 3) allow the file resource source system to stream file content to disk > instead > of reading the whole response body in RAM and then dumping it on file.
> This patch has been lightly tested and seems to work, YMMV, though. I also > tried for the patch to be as minimal as possible.
> I took some figures while puppetd was sourcing a 100MiB file:
> Now to be clear: this patch is a quick and dirty hack, I don't expect it to > be > merged as is, so I'm waiting reviews, tests, comments, flames. I even think > this patch brings a lot whole new serie of issues and bugs :-)
> Note: this patch introduces a new issue, regarding #2892, since we don't > have > anymore the whole serialized catalog in memory, it will be hard dump it as > is > to disk for the local cache. Something clever will need to be done (like > streaming > the cache...).
> Brice Figureau (5): > Add stream format capability > Add Yajl-ruby json parser format > Add a way to capture checksums on a "stream" > Let the rest client know how to handle stream response > Allows puppet:// content client streaming
> -- > You received this message because you are subscribed to the Google Groups > "Puppet Developers" group. > To post to this group, send email to puppet-dev@googlegroups.com. > To unsubscribe from this group, send email to > puppet-dev+unsubscribe@googlegroups.com<puppet-dev%2Bunsubscribe@googlegrou ps.com> > . > For more options, visit this group at > http://groups.google.com/group/puppet-dev?hl=en.
I am confused about a number of things with this patch; I'll pick out one that seems the easiest to explain in the hopes that the answer will allign my thinking:
I don't see how the following could work:
def find(request) - return nil unless result = deserialize(network(request).get(indirection2uri(request), headers)) + return nil unless result = network(request).request_get(indirection2uri(request), headers) do |response| + return deserialize(response) + end result.name = request.key result end
Specifically, if the block containing "return deserialize(response)" is called/yielded "live" I believe it should exit from the find method without the result name being set to the result key, whereas if it is stashed and called later (after find has returned) it should produce a LocalJumpError.
If this code works, I'm obviously missing something. Any idea what?
49 22 lambda-do.rb:15:in `bar': unexpected return (LocalJumpError) from lambda-do.rb:21:in `call' from lambda-do.rb:21
The foo(7) (use immediately) case returns the result from the block (x*x, or 49) and never gets to the last bit (the 2*x); the bar(11) case returns the 2*x but errors out when you try to call the stored block.
> I am confused about a number of things with this patch; I'll pick out > one that seems the easiest to explain in the hopes that the answer > will allign my thinking:
> I don't see how the following could work:
> def find(request) > - return nil unless result = > deserialize(network(request).get(indirection2uri(request), headers)) > + return nil unless result = > network(request).request_get(indirection2uri(request), headers) do > |response| > + return deserialize(response) > + end > result.name = request.key > result > end
What I understood is that returning a value in this block would end up in "result". And indeed I was able to deserialize catalogs, certificates and such, so I suppose it is working fine.
> Specifically, if the block containing "return deserialize(response)" > is called/yielded "live" I believe it should exit from the find method > without the result name being set to the result key, whereas if it is > stashed and called later (after find has returned) it should produce a > LocalJumpError.
To me it would return from the method that yields it (ie the request_get), but that's a wild guess.
> If this code works, I'm obviously missing something. Any idea what?
Your ruby is way better than mine, so I'd tend to trust what you say :-)
So you would write the code like this (sorry for the wrapping)? network(request).request_get(indirection2uri(request), headers) do |response| result = deserialize(response) end return nil unless result result.name = request.key result
If that's preferrable, this is an easy change. -- Brice Figureau My Blog: http://www.masterzen.fr/
> 49 > 22 > lambda-do.rb:15:in `bar': unexpected return (LocalJumpError) > from lambda-do.rb:21:in `call' > from lambda-do.rb:21
> The foo(7) (use immediately) case returns the result from the block (x*x, or > 49) and never gets to the last bit (the 2*x); the bar(11) case returns the > 2*x but errors out when you try to call the stored block.
So my code is incorrect because it would strip the end of the request_get which certainly closes/finishes network things. -- Brice Figureau My Blog: http://www.masterzen.fr/
>> If this code works, I'm obviously missing something. Any idea what?
> Your ruby is way better than mine, so I'd tend to trust what you say :-)
I don't always trust what I say, so no reason you should.
If the code overall appears to produce the correct results I suspect that it isn't doing exactly as either of us expect; is it possible that it's not getting to that code, or could some threading issues be confusing the situation?
>>> If this code works, I'm obviously missing something. Any idea what?
>> Your ruby is way better than mine, so I'd tend to trust what you say :-)
> I don't always trust what I say, so no reason you should.
> If the code overall appears to produce the correct results I suspect that it > isn't doing exactly as either of us expect; is it possible that it's not > getting to that code, or could some threading issues be confusing the > situation?
Obviously the response gets deserialized, so I'd say it works. It's just that, based on your sample, it seem I'm short-circuiting the Net::HTTP "request shutdown", which is certainly bad :-)
> I am confused about a number of things with this patch; I'll pick out > one that seems the easiest to explain in the hopes that the answer > will allign my thinking:
> I don't see how the following could work:
> def find(request) > - return nil unless result = > deserialize(network(request).get(indirection2uri(request), headers)) > + return nil unless result = > network(request).request_get(indirection2uri(request), headers) do > |response| > + return deserialize(response) > + end > result.name = request.key > result > end
I rewrote this part as I sent earlier and indeed this still works, except it wasn't doing the result.name= stuff. It allowed me to discover that this result.name= was failing hard when trying to fetch file metadata (those have no "name"). This is also fixed in the new patch revision.
> Following one of my blog post[1] about Puppetd memory consumption, I > decided to also try to add my stone to the edifice.
> Since #2892 is almost fixed by now, I decided to spend my time on > something else: > 1) json catalog (and its deserialization) > 2) sourced file content
> Those two things can consume memory for a short time (memory which > unfortunately > is always consumed, see my blog post for some explanations).
> So this patch brings to puppet:
> 1) support for a new Json parser: Yajl-ruby[1], which is faster and > uses > less memory than the regular json parser. Yajl-ruby is available as > a gem. > To activate the support it is needed to use -- > preferred_serialization_format=yajl
> 2) use Yajl-ruby in "streaming" mode: puppetd never reads the whole > response > body in ram, but instead feeds Yajl-ruby chunk by chunk. This way, > the json > serialized catalog is never fully stored in RAM.
> 3) allow the file resource source system to stream file content to > disk instead > of reading the whole response body in RAM and then dumping it on file.
> This patch has been lightly tested and seems to work, YMMV, though. > I also > tried for the patch to be as minimal as possible.
[...]
Hi Brice (and Markus, I guess),
What's the status of this patch series? I'd love to get this into rowlf if at all possible, and I'll work with you to get it done if you need some backup on it.
-- I respect faith, but doubt is what gets you an education. -- Wilson Mizner --------------------------------------------------------------------- Luke Kanies -|- http://reductivelabs.com -|- +1(615)594-8199
We're intending to include it (or some variation there of) if at all possible. Rein and I chatted about it over the weekend and he bookmarked interest in proceeding after he gets through his dashboard TTD list (probably tomorrow or Wednesday according to his status right after lunch today).