Following one of my blog post[1] about Puppetd memory consumption, I
decided to also try to add my stone to the edifice.
Since #2892 is almost fixed by now, I decided to spend my time on
something else:
1) json catalog (and its deserialization)
2) sourced file content
Those two things can consume memory for a short time (memory which unfortunately
is always consumed, see my blog post for some explanations).
So this patch brings to puppet:
1) support for a new Json parser: Yajl-ruby[1], which is faster and uses
less memory than the regular json parser. Yajl-ruby is available as a gem.
To activate the support it is needed to use --preferred_serialization_format=yajl
2) use Yajl-ruby in "streaming" mode: puppetd never reads the whole response
body in ram, but instead feeds Yajl-ruby chunk by chunk. This way, the json
serialized catalog is never fully stored in RAM.
3) allow the file resource source system to stream file content to disk instead
of reading the whole response body in RAM and then dumping it on file.
This patch has been lightly tested and seems to work, YMMV, though. I also
tried for the patch to be as minimal as possible.
I took some figures while puppetd was sourcing a 100MiB file:
Start Peak
0.25.4 VSZ 612860 925772
patched VSZ 611936 644104
0.25.4 RES 12988 298104
patched RES 11704 43536
Which I think speaks for itself :-)
Now to be clear: this patch is a quick and dirty hack, I don't expect it to be
merged as is, so I'm waiting reviews, tests, comments, flames. I even think
this patch brings a lot whole new serie of issues and bugs :-)
Note: this patch introduces a new issue, regarding #2892, since we don't have
anymore the whole serialized catalog in memory, it will be hard dump it as is
to disk for the local cache. Something clever will need to be done (like streaming
the cache...).
Thanks,
Brice
[1]:
http://www.masterzen.fr/2010/01/28/puppet-memory-usage-not-a-fatality/
[2]:
http://github.com/brianmario/yajl-ruby
Brice Figureau (5):
Add stream format capability
Add Yajl-ruby json parser format
Add a way to capture checksums on a "stream"
Let the rest client know how to handle stream response
Allows puppet:// content client streaming
lib/puppet/feature/yajl.rb | 24 ++++
lib/puppet/file_serving/content_stream.rb | 23 +++
lib/puppet/indirector/file_content/rest.rb | 10 ++-
lib/puppet/indirector/rest.rb | 14 ++-
lib/puppet/network/deferred_response.rb | 94 +++++++++++++
lib/puppet/network/format.rb | 4 +
lib/puppet/network/format_handler.rb | 23 +++-
lib/puppet/network/formats.rb | 112 +++++++++++++++
lib/puppet/network/response_stream.rb | 45 ++++++
lib/puppet/resource.rb | 2 +-
lib/puppet/type/file.rb | 19 +++-
lib/puppet/util/checksum_stream.rb | 20 +++
lib/puppet/util/checksums.rb | 12 ++
spec/integration/network/deferred_response.rb | 34 +++++
spec/integration/network/formats.rb | 82 +++++++++++-
spec/unit/file_serving/content_stream.rb | 29 ++++
spec/unit/indirector/file_content/rest.rb | 30 ++++-
spec/unit/indirector/rest.rb | 45 ++++---
spec/unit/network/deferred_response.rb | 182 +++++++++++++++++++++++++
spec/unit/network/format.rb | 4 +
spec/unit/network/format_handler.rb | 33 +++++
spec/unit/network/formats.rb | 110 +++++++++++++++-
spec/unit/network/response_stream.rb | 60 ++++++++
spec/unit/type/file.rb | 32 +++++
spec/unit/util/checksum_stream.rb | 23 +++
spec/unit/util/checksums.rb | 12 ++
26 files changed, 1040 insertions(+), 38 deletions(-)
create mode 100644 lib/puppet/feature/yajl.rb
create mode 100644 lib/puppet/file_serving/content_stream.rb
create mode 100644 lib/puppet/network/deferred_response.rb
create mode 100644 lib/puppet/network/response_stream.rb
create mode 100644 lib/puppet/util/checksum_stream.rb
create mode 100644 spec/integration/network/deferred_response.rb
create mode 100644 spec/unit/file_serving/content_stream.rb
create mode 100644 spec/unit/network/deferred_response.rb
create mode 100644 spec/unit/network/response_stream.rb
create mode 100644 spec/unit/util/checksum_stream.rb
diff --git a/lib/puppet/util/checksum_stream.rb b/lib/puppet/util/checksum_stream.rb
new file mode 100644
index 0000000..0add5fd
--- /dev/null
+++ b/lib/puppet/util/checksum_stream.rb
@@ -0,0 +1,20 @@
+
+class Puppet::Util::ChecksumStream
+ attr_accessor :digest
+
+ def initialize(digest)
+ @digest = digest.reset
+ end
+
+ def update(chunk)
+ digest << chunk
+ end
+
+ def checksum
+ digest.hexdigest
+ end
+
+ def to_s
+ "checksum: #{digest}"
+ end
+end
\ No newline at end of file
diff --git a/lib/puppet/util/checksums.rb b/lib/puppet/util/checksums.rb
index 98bf5de..1b73a86 100644
--- a/lib/puppet/util/checksums.rb
+++ b/lib/puppet/util/checksums.rb
@@ -1,3 +1,5 @@
+require 'puppet/util/checksum_stream'
+
# A stand-alone module for calculating checksums
# in a generic way.
module Puppet::Util::Checksums
@@ -34,6 +36,11 @@ module Puppet::Util::Checksums
md5_file(filename, true)
end
+ def md5_stream
+ require 'digest/md5'
+ Puppet::Util::ChecksumStream.new(Digest::MD5.new())
+ end
+
# Return the :mtime timestamp of a file.
def mtime_file(filename)
File.stat(filename).send(:mtime)
@@ -63,6 +70,11 @@ module Puppet::Util::Checksums
sha1_file(filename, true)
end
+ def sha1_stream
+ require 'digest/sha1'
+ Puppet::Util::ChecksumStream.new(Digest::SHA1.new())
+ end
+
# Return the :ctime of a file.
def ctime_file(filename)
File.stat(filename).send(:ctime)
diff --git a/spec/unit/util/checksum_stream.rb b/spec/unit/util/checksum_stream.rb
new file mode 100644
index 0000000..0ace637
--- /dev/null
+++ b/spec/unit/util/checksum_stream.rb
@@ -0,0 +1,23 @@
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/util/checksum_stream'
+
+describe Puppet::Util::ChecksumStream do
+
+ before(:each) do
+ @digest = stub 'digest'
+ @digest.stubs(:reset).returns(@digest)
+ @sum = Puppet::Util::ChecksumStream.new(@digest)
+ end
+
+ it "should add to the digest when update" do
+ @digest.expects(:<<).with("content")
+ @sum.update("content")
+ end
+
+ it "should produce the final checksum" do
+ @digest.expects(:hexdigest).returns("DEADBEEF")
+ @sum.checksum.should == "DEADBEEF"
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/util/checksums.rb b/spec/unit/util/checksums.rb
index d31d7a0..a27229e 100755
--- a/spec/unit/util/checksums.rb
+++ b/spec/unit/util/checksums.rb
@@ -15,6 +15,7 @@ describe Puppet::Util::Checksums do
content_sums = [:md5, :md5lite, :sha1, :sha1lite]
file_only = [:ctime, :mtime]
+ stream = [:md5_stream, :sha1_stream]
content_sums.each do |sumtype|
it "should be able to calculate %s sums from strings" % sumtype do
@@ -28,6 +29,12 @@ describe Puppet::Util::Checksums do
end
end
+ stream.each do |sumtype|
+ it "should be able to calculate %s sums from streams" % sumtype do
+ @summer.should be_respond_to(sumtype.to_s)
+ end
+ end
+
it "should have a method for stripping a sum type from an existing checksum" do
@summer.sumtype("{md5}asdfasdfa").should == "md5"
end
@@ -62,6 +69,11 @@ describe Puppet::Util::Checksums do
@summer.send(sum.to_s + "_file", file).should == :mydigest
end
+
+ it "should use #{klass} to seed checksum stream" do
+ @summer.send("#{sum}_stream").should be_instance_of(Puppet::Util::ChecksumStream)
+ @summer.send("#{sum}_stream").digest.should be_instance_of(klass)
+ end
end
end
--
1.6.6.1
Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---
lib/puppet/indirector/rest.rb | 14 +++++---
lib/puppet/network/response_stream.rb | 45 ++++++++++++++++++++++++
spec/unit/indirector/rest.rb | 45 +++++++++++++++----------
spec/unit/network/response_stream.rb | 60 +++++++++++++++++++++++++++++++++
4 files changed, 141 insertions(+), 23 deletions(-)
create mode 100644 lib/puppet/network/response_stream.rb
create mode 100644 spec/unit/network/response_stream.rb
diff --git a/lib/puppet/indirector/rest.rb b/lib/puppet/indirector/rest.rb
index a89e986..d3fb5ae 100644
--- a/lib/puppet/indirector/rest.rb
+++ b/lib/puppet/indirector/rest.rb
@@ -3,6 +3,7 @@ require 'uri'
require 'puppet/network/http_pool'
require 'puppet/network/http/api/v1'
+require 'puppet/network/response_stream'
# Access objects via REST
class Puppet::Indirector::REST < Puppet::Indirector::Terminus
@@ -44,10 +45,11 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus
content_type = response['content-type'].gsub(/\s*;.*$/,'') # strip any appended charset
# Convert the response to a deserialized object.
+ response = Puppet::Network::ResponseStream.new(response)
if multiple
- model.convert_from_multiple(content_type, response.body)
+ model.convert_from_multiple(content_type, response)
else
- model.convert_from(content_type, response.body)
+ model.convert_from(content_type, response)
end
else
# Raise the http error if we didn't get a 'success' of some kind.
@@ -66,14 +68,16 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus
end
def find(request)
- return nil unless result = deserialize(network(request).get(indirection2uri(request), headers))
+ return nil unless result = network(request).request_get(indirection2uri(request), headers) do |response|
+ return deserialize(response)
+ end
result.name = request.key
result
end
def search(request)
- unless result = deserialize(network(request).get(indirection2uri(request), headers), true)
- return []
+ return [] unless result = network(request).request_get(indirection2uri(request), headers) do |response|
+ return deserialize(response, true)
end
return result
end
diff --git a/lib/puppet/network/response_stream.rb b/lib/puppet/network/response_stream.rb
new file mode 100644
index 0000000..4b7c772
--- /dev/null
+++ b/lib/puppet/network/response_stream.rb
@@ -0,0 +1,45 @@
+
+require 'net/http'
+
+# This is a wrapper around either a Net::HTTPResponse or a String
+# allowing the same interface for the exterior world.
+class Puppet::Network::ResponseStream
+
+ attr_accessor :response
+
+ [:code, :body].each do |m|
+ define_method(m) do
+ response.send(:m)
+ end
+ end
+
+ def initialize(content)
+ @response = content
+ end
+
+ def stream?
+ response.is_a?(Net::HTTPResponse)
+ end
+
+ def content
+ response.body
+ end
+
+ def length
+ if stream?
+ response.content_length
+ else
+ response.length
+ end
+ end
+
+ def stream
+ if stream?
+ response.read_body do |r|
+ yield r
+ end
+ else
+ yield response.body
+ end
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/indirector/rest.rb b/spec/unit/indirector/rest.rb
index d12e3c6..fb6c03c 100755
--- a/spec/unit/indirector/rest.rb
+++ b/spec/unit/indirector/rest.rb
@@ -123,35 +123,42 @@ describe Puppet::Indirector::REST do
}
it "should return the results of converting from the format specified by the content-type header if the response code is in the 200s" do
- @model.expects(:convert_from).with("myformat", "mydata").returns "myobject"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "myformat"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"
+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from).with("myformat", stream).returns "myobject"
@searcher.deserialize(response).should == "myobject"
end
it "should convert and return multiple instances if the return code is in the 200s and 'multiple' is specified" do
- @model.expects(:convert_from_multiple).with("myformat", "mydata").returns "myobjects"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "myformat"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"
+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from_multiple).with("myformat", stream).returns "myobjects"
@searcher.deserialize(response, true).should == "myobjects"
end
it "should strip the content-type header to keep only the mime-type" do
- @model.expects(:convert_from).with("text/plain", "mydata").returns "myobject"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "text/plain; charset=utf-8"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"
+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from).with("text/plain", stream).returns "myobject"
+
@searcher.deserialize(response)
end
end
@@ -186,7 +193,8 @@ describe Puppet::Indirector::REST do
describe "when doing a find" do
before :each do
- @connection = stub('mock http connection', :get => @response)
+ @connection = stub('mock http connection')
+ @connection.stubs(:request_get).yields(@response)
@searcher.stubs(:network).returns(@connection) # neuter the network connection
# Use a key with spaces, so we can test escaping
@@ -195,27 +203,27 @@ describe Puppet::Indirector::REST do
it "should call the GET http method on a network connection" do
@searcher.expects(:network).returns @connection
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields @response
@searcher.find(@request)
end
it "should deserialize and return the http response" do
- @connection.expects(:get).returns @response
instance = stub 'object', :name= => nil
@searcher.expects(:deserialize).with(@response).returns instance
+ @connection.expects(:request_get).yields(@response).returns(instance)
@searcher.find(@request).should == instance
end
it "should use the URI generated by the Handler module" do
@searcher.expects(:indirection2uri).with(@request).returns "/my/uri"
- @connection.expects(:get).with { |path, args| path == "/my/uri" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| path == "/my/uri" }.yields(@response)
@searcher.find(@request)
end
it "should provide an Accept header containing the list of supported formats joined with commas" do
- @connection.expects(:get).with { |path, args| args["Accept"] == "supported, formats" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| args["Accept"] == "supported, formats" }.yields(@response)
@searcher.model.expects(:supported_formats).returns %w{supported formats}
@searcher.find(@request)
@@ -227,7 +235,7 @@ describe Puppet::Indirector::REST do
end
it "should set the name of the resulting instance to the asked-for name" do
- @searcher.expects(:deserialize).with(@response).returns @instance
+ @connection.expects(:request_get).returns @instance
@instance.expects(:name=).with "foo bar"
@searcher.find(@request)
end
@@ -240,7 +248,8 @@ describe Puppet::Indirector::REST do
describe "when doing a search" do
before :each do
- @connection = stub('mock http connection', :get => @response)
+ @connection = stub('mock http connection')
+ @connection.stubs(:request_get).yields(@response)
@searcher.stubs(:network).returns(@connection) # neuter the network connection
@model.stubs(:convert_from_multiple)
@@ -250,12 +259,12 @@ describe Puppet::Indirector::REST do
it "should call the GET http method on a network connection" do
@searcher.expects(:network).returns @connection
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields @response
@searcher.search(@request)
end
it "should deserialize as multiple instances and return the http response" do
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields(@response).returns "myobject"
@searcher.expects(:deserialize).with(@response, true).returns "myobject"
@searcher.search(@request).should == 'myobject'
@@ -263,19 +272,19 @@ describe Puppet::Indirector::REST do
it "should use the URI generated by the Handler module" do
@searcher.expects(:indirection2uri).with(@request).returns "/mys/uri"
- @connection.expects(:get).with { |path, args| path == "/mys/uri" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| path == "/mys/uri" }.yields(@response)
@searcher.search(@request)
end
it "should provide an Accept header containing the list of supported formats joined with commas" do
- @connection.expects(:get).with { |path, args| args["Accept"] == "supported, formats" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| args["Accept"] == "supported, formats" }.yields(@response)
@searcher.model.expects(:supported_formats).returns %w{supported formats}
@searcher.search(@request)
end
it "should return an empty array if serialization returns nil" do
- @model.stubs(:convert_from_multiple).returns nil
+ @connection.expects(:request_get).returns nil
@searcher.search(@request).should == []
end
diff --git a/spec/unit/network/response_stream.rb b/spec/unit/network/response_stream.rb
new file mode 100644
index 0000000..08b23b5
--- /dev/null
+++ b/spec/unit/network/response_stream.rb
@@ -0,0 +1,60 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/network/response_stream'
+
+describe Puppet::Network::ResponseStream do
+ before(:each) do
+ @content = stub_everything 'content'
+ @stream = Puppet::Network::ResponseStream.new(@content)
+ end
+
+ it "should identify itself as a stream if the underlying content is an http response" do
+ @content.expects(:is_a?).with(Net::HTTPResponse).returns(true)
+ @stream.should be_stream
+ end
+
+ it "should not identify itself as a stream if the underlying content is not an http response" do
+ @content.expects(:is_a?).with(Net::HTTPResponse).returns(false)
+ @stream.should_not be_stream
+ end
+
+ it "should be able to return content length" do
+ @stream.should respond_to(:length)
+ end
+
+ it "should be able to stream content" do
+ @stream.should respond_to(:stream)
+ end
+
+ describe "when asking for content length" do
+ it "should return the content-length header if it is a stream" do
+ @stream.stubs(:stream?).returns(true)
+ @content.expects(:content_length).returns 10
+ @stream.length.should == 10
+ end
+
+ it "should return the string length otherwise" do
+ @content.expects(:length).returns 10
+ @stream.length.should == 10
+ end
+ end
+
+ describe "when streaming" do
+ it "should yield the block to the response read_body method if it is a stream" do
+ @stream.stubs(:stream?).returns(true)
+ @content.expects(:read_body).yields("chunk")
+ @stream.stream do |chunk|
+ chunk.should == "chunk"
+ end
+ end
+
+ it "should yield the full body if it is not a stream" do
+ @content.expects(:body).returns("body")
+ @stream.stream do |chunk|
+ chunk.should == "body"
+ end
+ end
+ end
+end
\ No newline at end of file
--
1.6.6.1
Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---
lib/puppet/network/format.rb | 4 ++++
lib/puppet/network/format_handler.rb | 23 ++++++++++++++++++++---
spec/unit/network/format.rb | 4 ++++
spec/unit/network/format_handler.rb | 33 +++++++++++++++++++++++++++++++++
4 files changed, 61 insertions(+), 3 deletions(-)
diff --git a/lib/puppet/network/format.rb b/lib/puppet/network/format.rb
index d781242..b568aa7 100644
--- a/lib/puppet/network/format.rb
+++ b/lib/puppet/network/format.rb
@@ -89,6 +89,10 @@ class Puppet::Network::Format
suitable? and required_methods_present?(klass)
end
+ def support_stream?
+ false
+ end
+
def to_s
"Puppet::Network::Format[%s]" % name
end
diff --git a/lib/puppet/network/format_handler.rb b/lib/puppet/network/format_handler.rb
index ea8cf35..1435b99 100644
--- a/lib/puppet/network/format_handler.rb
+++ b/lib/puppet/network/format_handler.rb
@@ -23,7 +23,7 @@ module Puppet::Network::FormatHandler
@format = format
end
- [:intern, :intern_multiple, :render, :render_multiple, :mime].each do |method|
+ [:intern, :intern_multiple, :render, :render_multiple, :mime, :support_stream?].each do |method|
define_method(method) do |*args|
protect(method, args)
end
@@ -93,12 +93,21 @@ module Puppet::Network::FormatHandler
Puppet::Network::FormatHandler
end
+ def decapsulate(format, data)
+ format = format_handler.protected_format(format)
+
+ data = data.content if !format.support_stream? and data.respond_to?(:stream?) and data.stream?
+ [format, data]
+ end
+
def convert_from(format, data)
- format_handler.protected_format(format).intern(self, data)
+ format, data = decapsulate(format, data)
+ format.intern(self, data)
end
def convert_from_multiple(format, data)
- format_handler.protected_format(format).intern_multiple(self, data)
+ format, data = decapsulate(format, data)
+ format.intern_multiple(self, data)
end
def render_multiple(format, instances)
@@ -113,6 +122,10 @@ module Puppet::Network::FormatHandler
Puppet::Network::FormatHandler.format(name).supported?(self)
end
+ def support_stream?(name)
+ Puppet::Network::FormatHandler.format(name).support_stream?
+ end
+
def supported_formats
result = format_handler.formats.collect { |f| format_handler.format(f) }.find_all { |f| f.supported?(self) }.collect { |f| f.name }.sort do |a, b|
# It's an inverse sort -- higher weight formats go first.
@@ -164,6 +177,10 @@ module Puppet::Network::FormatHandler
def support_format?(name)
self.class.support_format?(name)
end
+
+ def support_stream?(name)
+ self.class.support_stream?(name)
+ end
end
end
diff --git a/spec/unit/network/format.rb b/spec/unit/network/format.rb
index e5a6b59..feefd68 100755
--- a/spec/unit/network/format.rb
+++ b/spec/unit/network/format.rb
@@ -66,6 +66,10 @@ describe Puppet::Network::Format do
@format.should respond_to(:supported?)
end
+ it "should be able to tell it supports streaming" do
+ @format.should respond_to(:support_stream?)
+ end
+
it "should consider a class to be supported if it has the individual and multiple methods for rendering and interning" do
@format.should be_supported(FormatRenderer)
end
diff --git a/spec/unit/network/format_handler.rb b/spec/unit/network/format_handler.rb
index 110effe..a5a68ea 100755
--- a/spec/unit/network/format_handler.rb
+++ b/spec/unit/network/format_handler.rb
@@ -117,6 +117,7 @@ describe Puppet::Network::FormatHandler do
@format = mock 'format'
@format.stubs(:supported?).returns true
@format.stubs(:name).returns :my_format
+ @format.stubs(:support_stream?).returns false
Puppet::Network::FormatHandler.stubs(:format).with(:my_format).returns @format
Puppet::Network::FormatHandler.stubs(:mime).with("text/myformat").returns @format
Puppet::Network::Format.stubs(:===).returns false
@@ -132,6 +133,15 @@ describe Puppet::Network::FormatHandler do
FormatTester.support_format?(:my_format)
end
+ it "should be able to test whether a format supports streaming" do
+ FormatTester.should respond_to(:support_stream?)
+ end
+
+ it "should use the Format to determine whether a given format supports streaming" do
+ @format.expects(:support_stream?)
+ FormatTester.support_stream?(:my_format)
+ end
+
it "should be able to convert from a given format" do
FormatTester.should respond_to(:convert_from)
end
@@ -141,6 +151,29 @@ describe Puppet::Network::FormatHandler do
FormatTester.convert_from(:my_format, "mydata")
end
+ it "should not decapsulate the content when converting with a format that supports stream" do
+ data = stub_everything 'data'
+ data.expects(:content).never
+ @format.expects(:support_stream?).returns(true)
+ @format.expects(:intern).with(FormatTester, data)
+ FormatTester.convert_from(:my_format, data)
+ end
+
+ it "should not decapsulate the content when converting streamable data" do
+ data = stub_everything 'data', :stream? => true
+ data.expects(:content).never
+ @format.expects(:support_stream?).returns(true)
+ @format.expects(:intern).with(FormatTester, data)
+ FormatTester.convert_from(:my_format, data)
+ end
+
+ it "should decapsulate the content when converting streamable data with a non streamable format" do
+ data = stub_everything 'data', :stream? => true
+ data.expects(:content).returns("mydata")
+ @format.expects(:intern).with(FormatTester, "mydata")
+ FormatTester.convert_from(:my_format, data)
+ end
+
it "should call the format-specific converter when asked to convert from a given format by mime-type" do
@format.expects(:intern).with(FormatTester, "mydata")
FormatTester.convert_from("text/myformat", "mydata")
--
1.6.6.1
Apparently pson was serializing ruby objects by calling to_s.
Yajl puts its public properties in a hash, since ResourceReference
don't have a specific to_pson_data_hash we force them to serialize
as string, thus enabling the same result as with pson.
Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---
lib/puppet/feature/yajl.rb | 24 ++++++++
lib/puppet/network/formats.rb | 108 +++++++++++++++++++++++++++++++++++
lib/puppet/resource.rb | 2 +-
spec/integration/network/formats.rb | 82 ++++++++++++++++++++++++++-
spec/unit/network/formats.rb | 106 +++++++++++++++++++++++++++++++++-
5 files changed, 317 insertions(+), 5 deletions(-)
create mode 100644 lib/puppet/feature/yajl.rb
diff --git a/lib/puppet/feature/yajl.rb b/lib/puppet/feature/yajl.rb
new file mode 100644
index 0000000..b4d4954
--- /dev/null
+++ b/lib/puppet/feature/yajl.rb
@@ -0,0 +1,24 @@
+require 'puppet/util/feature'
+
+# We want this to load if possible, but it's not automatically
+# required.
+Puppet.features.rubygems?
+Puppet.features.add(:yajl) do
+ found = false
+ begin
+ require 'rubygems'
+ require 'yajl'
+
+ #Yajl::Encoder.enable_json_gem_compatability
+
+ class ::Object
+ def to_pson(*args, &block)
+ "\"#{to_s}\""
+ end
+ end
+
+ found = true
+ rescue LoadError => detail
+ end
+ found
+end
diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb
index a98dcbc..8b9b68d 100644
--- a/lib/puppet/network/formats.rb
+++ b/lib/puppet/network/formats.rb
@@ -186,3 +186,111 @@ Puppet::Network::FormatHandler.create(:pson, :mime => "text/pson", :weight => 10
klass.from_pson(data)
end
end
+
+Puppet::Network::FormatHandler.create(:yajl, :mime => "text/yajl", :weight => 20) do
+ confine :true => Puppet.features.yajl?
+
+ class Puppet::ParsingComplete
+ attr_accessor :result
+
+ def initialize
+ @result = []
+ end
+
+ def complete(object)
+ @result << object
+ end
+ end
+
+ module ::PSON
+ alias :old_parse :parse
+ def parse(string)
+ Yajl::Parser.parse(string)
+ end
+ end
+
+ def parse(content)
+ if content.respond_to?(:stream?)
+ unless content.stream?
+ Yajl::Parser.parse(content.content)
+ else
+ complete = Puppet::ParsingComplete.new
+ parser = Yajl::Parser.new
+ parser.on_parse_complete = complete.method(:complete)
+ content.stream do |r|
+ parser << r
+ end
+ result = complete.result
+ return result.shift if result.size == 1
+ result
+ end
+ else
+ Yajl::Parser.parse(content)
+ end
+ end
+
+ def intern(klass, content)
+ data_to_instance(klass, parse(content))
+ end
+
+ def intern_multiple(klass, content)
+ parse(content).collect do |data|
+ data_to_instance(klass, data)
+ end
+ end
+
+ def render(instance)
+ Yajl::Encoder.encode(instance_to_data(instance))
+ end
+
+ def render_multiple(instances)
+ out = ""
+ encoder = Yajl::Encoder.new
+ instances.collect do |i|
+ out << encoder.encode(instance_to_data(i))
+ end
+ out
+ end
+
+ # supported only for brave souls installing yajl
+ def supported?(klass)
+ Puppet.features.yajl?
+ end
+
+ def support_stream?
+ # of course we do
+ true
+ end
+
+ # If they pass class information, we want to ignore it. By default,
+ # we'll include class information but we won't rely on it - we don't
+ # want class names to be required because we then can't change our
+ # internal class names, which is bad.
+ def data_to_instance(klass, data)
+ if data.is_a?(Hash) and d = data['data']
+ data = d
+ end
+ if data.is_a?(klass)
+ return data
+ end
+ klass.from_pson(data)
+ end
+
+ # recursively call to_pson_data_hash on objects
+ # supporting it
+ def instance_to_data(instance)
+ instance = instance.to_pson_data_hash if instance.respond_to?(:to_pson_data_hash)
+ case instance
+ when Hash
+ instance = instance.inject({}) do |h, (k,v)|
+ h[k] = instance_to_data(v)
+ h
+ end
+ when Array
+ instance.collect! do |i|
+ instance_to_data(i)
+ end
+ end
+ instance
+ end
+end
diff --git a/lib/puppet/resource.rb b/lib/puppet/resource.rb
index 91dd547..2bf99c0 100644
--- a/lib/puppet/resource.rb
+++ b/lib/puppet/resource.rb
@@ -53,7 +53,7 @@ class Puppet::Resource
# Don't duplicate the title as the namevar
next hash if param == namevar and value == title
- hash[param] = value
+ hash[param] = value.is_a?(Puppet::Resource::Reference) ? value.to_s : value
hash
end
diff --git a/spec/integration/network/formats.rb b/spec/integration/network/formats.rb
index 35e7977..6eef347 100755
--- a/spec/integration/network/formats.rb
+++ b/spec/integration/network/formats.rb
@@ -18,11 +18,15 @@ class PsonIntTest
@string = string
end
- def to_pson(*args)
+ def to_pson_data_hash
{
'type' => self.class.name,
'data' => [@string]
- }.to_pson(*args)
+ }
+ end
+
+ def to_pson(*args)
+ to_pson_data_hash.to_pson(*args)
end
def self.canonical_order(s)
@@ -108,3 +112,77 @@ describe Puppet::Network::FormatHandler.format(:pson) do
end
end
end
+
+describe Puppet::Network::FormatHandler.format(:yajl) do
+ describe "when yajl is absent" do
+ confine "'yajl' library is present" => (! Puppet.features.yajl?)
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should not be suitable" do
+ @yajl.should_not be_suitable
+ end
+
+ it "should not be supported" do
+ @yajl.should_not be_supported
+ end
+ end
+
+ describe "when yajl is available" do
+ confine "Missing 'yajl' library" => Puppet.features.yajl?
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should be able to render an instance to json" do
+ instance = PsonIntTest.new("foo")
+ PsonIntTest.canonical_order(@yajl.render(instance)).should == PsonIntTest.canonical_order('{"type":"PsonIntTest","data":["foo"]}' )
+ end
+
+ it "should be able to render arrays to json" do
+ @yajl.render([1,2]).should == '[1,2]'
+ end
+
+ it "should be able to render arrays containing hashes to json" do
+ @yajl.render([{"one"=>1},{"two"=>2}]).should == '[{"one":1},{"two":2}]'
+ end
+
+ it "should be able to render multiple instances to json" do
+ Puppet.features.add(:yajl, :libs => %w{yajl})
+
+ one = PsonIntTest.new("one")
+ two = PsonIntTest.new("two")
+
+ PsonIntTest.canonical_order(@yajl.render([one,two])).should == PsonIntTest.canonical_order('[{"type":"PsonIntTest","data":["one"]},{"type":"PsonIntTest","data":["two"]}]')
+ end
+
+ it "should be able to intern a stream" do
+ content = stub 'stream', :stream? => true
+ content.expects(:stream).multiple_yields('{"type":"PsonIntTest",', '"data":["foo"]}')
+ @yajl.intern(PsonIntTest, content).should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern json into an instance" do
+ @yajl.intern(PsonIntTest, '{"type":"PsonIntTest","data":["foo"]}').should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern json with no class information into an instance" do
+ @yajl.intern(PsonIntTest, '["foo"]').should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern multiple instances from json" do
+ @yajl.intern_multiple(PsonIntTest, '[{"type": "PsonIntTest", "data": ["one"]},{"type": "PsonIntTest", "data": ["two"]}]').should == [
+ PsonIntTest.new("one"), PsonIntTest.new("two")
+ ]
+ end
+
+ it "should be able to intern multiple instances from json with no class information" do
+ @yajl.intern_multiple(PsonIntTest, '[["one"],["two"]]').should == [
+ PsonIntTest.new("one"), PsonIntTest.new("two")
+ ]
+ end
+ end
+end
diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb
index a241306..208c6f4 100755
--- a/spec/unit/network/formats.rb
+++ b/spec/unit/network/formats.rb
@@ -18,11 +18,15 @@ class PsonTest
@string = string
end
- def to_pson(*args)
+ def to_pson_data_hash
{
'type' => self.class.name,
'data' => @string
- }.to_pson(*args)
+ }
+ end
+
+ def to_pson(*args)
+ to_pson_data_hash.to_pson(*args)
end
end
@@ -362,4 +366,102 @@ describe "Puppet Network Format" do
end
end
end
+
+
+ it "should include a yajl format" do
+ Puppet::Network::FormatHandler.format(:yajl).should_not be_nil
+ end
+
+ describe "yajl" do
+ confine "Missing 'yajl' library" => Puppet.features.yajl?
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should have its mime type set to text/yajl" do
+ Puppet::Network::FormatHandler.format(:yajl).mime.should == "text/yajl"
+ end
+
+ it "should require the :render_method" do
+ Puppet::Network::FormatHandler.format(:yajl).required_methods.should be_include(:render_method)
+ end
+
+ it "should require the :intern_method" do
+ Puppet::Network::FormatHandler.format(:yajl).required_methods.should be_include(:intern_method)
+ end
+
+ it "should have a weight of 20" do
+ @yajl.weight.should == 20
+ end
+
+ describe "when supported" do
+ it "should render by calling 'to_pson_data_hash' on the instance" do
+ instance = PsonTest.new("foo")
+ instance.expects(:to_pson_data_hash).returns "foo"
+ @yajl.render(instance).should == "\"foo\""
+ end
+
+ it "should render multiple instances by calling 'to_pson_data_hash' on each element array" do
+ instance = mock "instance"
+ instances = [instance]
+
+ instance.expects(:to_pson_data_hash).returns "foo"
+
+ @yajl.render_multiple(instances).should == "\"foo\""
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' on the text and then using from_pson to convert the data into an instance" do
+ content = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ PsonTest.expects(:from_pson).with("foo").returns "parsed_yajl"
+ @yajl.intern(PsonTest, content).should == "parsed_yajl"
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' in stream mode if content is streamable" do
+ content = stub 'foo', :stream? => true
+ content.expects(:stream).yields "foo"
+ parser = stub_everything 'parser'
+ Yajl::Parser.expects(:new).returns(parser)
+ parser.expects(:<<).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ @yajl.intern(PsonTest, content)
+ end
+
+ it "should return parsed objects in stream mode" do
+ content = stub 'foo', :stream? => true
+ content.stubs(:stream).yields "foo"
+ parsing_complete = stub 'parsing_complete'
+ parsing_complete.expects(:method)
+ Puppet::ParsingComplete.expects(:new).returns(parsing_complete)
+ parser = stub_everything 'parser'
+ Yajl::Parser.expects(:new).returns(parser)
+ parser.expects(:<<).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ parsing_complete.expects(:result).returns([])
+ @yajl.intern(PsonTest, content)
+ end
+
+ it "should not render twice if 'Yajl::Parser.parse' creates the appropriate instance" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ instance = PsonTest.new("foo")
+ Yajl::Parser.expects(:parse).with("foo").returns(instance)
+ PsonTest.expects(:from_pson).never
+ @yajl.intern(PsonTest, text).should equal(instance)
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' on the text and then using from_pson to convert the actual into an instance if the yajl has no class/data separation" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns("foo")
+ PsonTest.expects(:from_pson).with("foo").returns "parsed_yajl"
+ @yajl.intern(PsonTest, text).should == "parsed_yajl"
+ end
+
+ it "should intern multiples by parsing the text and using 'class.intern' on each resulting data structure" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns ["bar", "baz"]
+ PsonTest.expects(:from_pson).with("bar").returns "BAR"
+ PsonTest.expects(:from_pson).with("baz").returns "BAZ"
+ @yajl.intern_multiple(PsonTest, text).should == %w{BAR BAZ}
+ end
+ end
+ end
end
--
1.6.6.1
Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---
lib/puppet/file_serving/content_stream.rb | 23 +++
lib/puppet/indirector/file_content/rest.rb | 10 ++-
lib/puppet/network/deferred_response.rb | 94 +++++++++++++
lib/puppet/network/formats.rb | 4 +
lib/puppet/type/file.rb | 19 +++-
spec/integration/network/deferred_response.rb | 34 +++++
spec/unit/file_serving/content_stream.rb | 29 ++++
spec/unit/indirector/file_content/rest.rb | 30 ++++-
spec/unit/network/deferred_response.rb | 182 +++++++++++++++++++++++++
spec/unit/network/formats.rb | 4 +
spec/unit/type/file.rb | 32 +++++
11 files changed, 454 insertions(+), 7 deletions(-)
create mode 100644 lib/puppet/file_serving/content_stream.rb
create mode 100644 lib/puppet/network/deferred_response.rb
create mode 100644 spec/integration/network/deferred_response.rb
create mode 100644 spec/unit/file_serving/content_stream.rb
create mode 100644 spec/unit/network/deferred_response.rb
diff --git a/lib/puppet/file_serving/content_stream.rb b/lib/puppet/file_serving/content_stream.rb
new file mode 100644
index 0000000..2f0b1a7
--- /dev/null
+++ b/lib/puppet/file_serving/content_stream.rb
@@ -0,0 +1,23 @@
+#
+# Created by Luke Kanies on 2007-10-16.
+# Copyright (c) 2007. All rights reserved.
+
+require 'puppet/indirector'
+require 'puppet/file_serving'
+require 'puppet/file_serving/content'
+
+# A class that handles retrieving file contents.
+# It only reads the file when its content is specifically
+# asked for.
+class Puppet::FileServing::ContentStream < Puppet::FileServing::Content
+
+ def self.create(content)
+ instance = new("/this/is/a/fake/path")
+ instance.content = content
+ instance
+ end
+
+ def self.from_raw(content)
+ content
+ end
+end
diff --git a/lib/puppet/indirector/file_content/rest.rb b/lib/puppet/indirector/file_content/rest.rb
index 7b3cade..6da9103 100644
--- a/lib/puppet/indirector/file_content/rest.rb
+++ b/lib/puppet/indirector/file_content/rest.rb
@@ -2,10 +2,18 @@
# Created by Luke Kanies on 2007-10-18.
# Copyright (c) 2007. All rights reserved.
-require 'puppet/file_serving/content'
+require 'puppet/file_serving/content_stream'
require 'puppet/indirector/file_content'
require 'puppet/indirector/rest'
+require 'puppet/network/deferred_response'
class Puppet::Indirector::FileContent::Rest < Puppet::Indirector::REST
desc "Retrieve file contents via a REST HTTP interface."
+
+ # let's cheat a little bit:
+ # instead of returning a "model" we're returning something pretending to be a model
+ # which instead will fire a deferred request when needed
+ def find(request)
+ return Puppet::FileServing::ContentStream.create(Puppet::Network::DeferredResponse.new(request, indirection2uri(request), headers, self))
+ end
end
diff --git a/lib/puppet/network/deferred_response.rb b/lib/puppet/network/deferred_response.rb
new file mode 100644
index 0000000..9175e72
--- /dev/null
+++ b/lib/puppet/network/deferred_response.rb
@@ -0,0 +1,94 @@
+require 'net/http'
+require 'thread'
+
+class Puppet::Network::DeferredResponse
+
+ attr_accessor :request, :uri, :headers, :rest
+ attr_accessor :response
+ attr_accessor :request_started
+
+ def initialize(request, uri, headers, rest)
+ @request = request
+ @uri = uri
+ @headers = headers
+ @rest = rest
+ @mutex = Mutex.new
+ @chunk_queue = SizedQueue.new(1)
+ @got_response = ConditionVariable.new
+ @chunk = nil
+ @request_started = false
+ end
+
+ def stream?
+ true
+ end
+
+ def length
+ start_request
+ response.content_length
+ end
+
+ def start_request
+ # bail out early if we already started the request
+ return if request_started?
+
+ # launch the network request
+ Thread.new do
+ rest.network(request).request_get(uri, headers) do |response|
+ # we got a response from server
+ @mutex.synchronize do
+ @request_started = true
+ @response = response
+ @got_response.signal
+ end
+
+ unless content = rest.deserialize(response)
+ fail "Could not find any content at %s" % request
+ end
+
+ stream_response(content.content)
+ end
+ end
+
+ # wait for the start of the response to be available
+ @mutex.synchronize do
+ @got_response.wait(@mutex) unless @request_started
+ end
+ end
+
+ def stream
+ start_request
+ while true do
+ unless chunk = @chunk_queue.pop
+ # it's the end
+ break
+ end
+
+ @checksum.update(chunk) if @checksum
+
+ yield chunk
+ end
+ return @checksum
+ end
+
+ # if we want the stream to be checksummed on the fly
+ # either set a Digest or a Puppet::Util::ChecksumStream
+ def checksum=(checksum)
+ @checksum = checksum
+ end
+
+ def request_started?
+ @mutex.synchronize do
+ return @request_started
+ end
+ end
+
+ def stream_response(stream)
+ stream.response.read_body do |chunk|
+ # send chunks to our consumer
+ @chunk_queue << chunk
+ end
+ # it's over guys
+ @chunk_queue << nil
+ end
+end
\ No newline at end of file
diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb
index 8b9b68d..19053ec 100644
--- a/lib/puppet/network/formats.rb
+++ b/lib/puppet/network/formats.rb
@@ -143,6 +143,10 @@ Puppet::Network::FormatHandler.create(:raw, :mime => "application/x-raw", :weigh
raise NotImplementedError
end
+ def support_stream?
+ true
+ end
+
# LAK:NOTE The format system isn't currently flexible enough to handle
# what I need to support raw formats just for individual instances (rather
# than both individual and collections), but we don't yet have enough data
diff --git a/lib/puppet/type/file.rb b/lib/puppet/type/file.rb
index 2f5b5df..3d3b8b6 100644
--- a/lib/puppet/type/file.rb
+++ b/lib/puppet/type/file.rb
@@ -710,7 +710,11 @@ module Puppet
if validate = validate_checksum?
# Use the appropriate checksum type -- md5, md5lite, etc.
sumtype = property(:checksum).checktype
- checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content)
+ if content.is_a?(String)
+ checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content)
+ else
+ content.checksum = property(:checksum).send("#{sumtype}_stream")
+ end
end
remove_existing(:file)
@@ -729,7 +733,18 @@ module Puppet
umask = mode ? 000 : 022
Puppet::Util.withumask(umask) do
- File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) { |f| f.print content }
+ File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) do |f|
+ if content.is_a?(String)
+ f.print content
+ else # we're in front of a stream
+ newchecksum = content.stream do |c|
+ f.print c
+ end
+ if validate and newchecksum
+ checksum ||= "{#{sumtype}}" + newchecksum.checksum
+ end
+ end
+ end
end
# And put our new file in place
diff --git a/spec/integration/network/deferred_response.rb b/spec/integration/network/deferred_response.rb
new file mode 100644
index 0000000..01cb6ba
--- /dev/null
+++ b/spec/integration/network/deferred_response.rb
@@ -0,0 +1,34 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/network/deferred_response'
+
+describe Puppet::Network::DeferredResponse do
+ before(:each) do
+ @request = stub_everything 'request'
+ @uri = stub_everything 'uri'
+ @headers = stub_everything 'headers'
+
+ @request = stub_everything 'request'
+ @http = stub_everything 'http'
+ @rest = stub_everything 'rest', :network => @http
+
+ @response = stub_everything 'response'
+ @http.stubs(:request_get).yields(@response)
+
+ @stream = stub_everything 'stream', :response => @response
+ @response.stubs(:read_body).multiple_yields("chunk1", "chunk2")
+ @content = stub_everything 'content', :content => @stream
+
+ @rest.stubs(:deserialize).returns(@content)
+
+ @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest)
+ end
+
+ it "should pass chunks to the main thread" do
+ @deferred.stream do |c|
+ c.should match /^chunk\d/
+ end
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/file_serving/content_stream.rb b/spec/unit/file_serving/content_stream.rb
new file mode 100644
index 0000000..b0d1874
--- /dev/null
+++ b/spec/unit/file_serving/content_stream.rb
@@ -0,0 +1,29 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/file_serving/content_stream'
+
+describe Puppet::FileServing::ContentStream do
+ it "should should be a subclass of Content" do
+ Puppet::FileServing::ContentStream.superclass.should equal(Puppet::FileServing::Content)
+ end
+
+ it "should be able to create a content instance" do
+ Puppet::FileServing::ContentStream.should respond_to(:create)
+ end
+
+ it "should return the content itself when converting from raw" do
+ content = stub 'content'
+ Puppet::FileServing::ContentStream.from_raw(content).should == content
+ end
+
+ it "should create an instance with a fake file name and correct content when converting from raw" do
+ instance = mock 'instance'
+ Puppet::FileServing::ContentStream.expects(:new).with("/this/is/a/fake/path").returns instance
+
+ instance.expects(:content=).with "foo/bar"
+
+ Puppet::FileServing::ContentStream.create("foo/bar").should equal(instance)
+ end
+end
diff --git a/spec/unit/indirector/file_content/rest.rb b/spec/unit/indirector/file_content/rest.rb
index afb674e..4fa0d20 100755
--- a/spec/unit/indirector/file_content/rest.rb
+++ b/spec/unit/indirector/file_content/rest.rb
@@ -2,10 +2,32 @@
require File.dirname(__FILE__) + '/../../../spec_helper'
-require 'puppet/indirector/file_content'
+require 'puppet/indirector/file_content/rest'
+require 'puppet/file_serving/content'
+require 'puppet/file_serving/content_stream'
-describe "Puppet::Indirector::Content::Rest" do
- it "should add the node's cert name to the arguments"
+describe "Puppet::Indirector::FileContent::Rest" do
+ it "should be a sublcass of Puppet::Indirector::REST" do
+ Puppet::Indirector::FileContent::Rest.superclass.should equal(Puppet::Indirector::REST)
+ end
- it "should set the content type to text/plain"
+ describe "when finding" do
+ before(:each) do
+ @request = stub_everything 'request'
+ Puppet::FileServing::Content.terminus_class = :rest
+ @indirector = Puppet::Indirector::FileContent::Rest.new
+ @indirector.stubs(:indirection2uri).with(@request).returns("/here")
+ Puppet::Network::DeferredResponse.stubs(:new)
+ end
+
+ it "should return a Puppet::FileServing::ContentStream as model" do
+ @indirector.find(@request).should be_instance_of(Puppet::FileServing::ContentStream)
+ end
+
+ it "should return a content stream wrapping a deferred response" do
+ content = stub_everything 'content'
+ Puppet::Network::DeferredResponse.expects(:new).returns content
+ @indirector.find(@request).content.should == content
+ end
+ end
end
diff --git a/spec/unit/network/deferred_response.rb b/spec/unit/network/deferred_response.rb
new file mode 100644
index 0000000..858229e
--- /dev/null
+++ b/spec/unit/network/deferred_response.rb
@@ -0,0 +1,182 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/network/deferred_response'
+
+describe Puppet::Network::DeferredResponse do
+ before(:each) do
+ @request = stub_everything 'request'
+ @uri = stub_everything 'uri'
+ @headers = stub_everything 'headers'
+
+ @request = stub_everything 'request'
+ @http = stub_everything 'http', :request_get => @request
+ @rest = stub_everything 'rest', :network => @http
+
+ @chunk_queue = stub_everything 'chunk_queue'
+ SizedQueue.expects(:new).returns(@chunk_queue)
+
+ @got_response = stub_everything 'got_response'
+ ConditionVariable.stubs(:new).returns(@got_response)
+
+ @mutex = stub_everything 'mutex'
+ @mutex.stubs(:synchronize).yields
+ Mutex.expects(:new).returns(@mutex)
+
+ @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest)
+ end
+
+ it "should be a 'stream'" do
+ @deferred.should be_stream
+ end
+
+ it "should be possible to set a checksum" do
+ @deferred.should be_respond_to(:checksum=)
+ end
+
+ describe "when getting length" do
+ before(:each) do
+ @response = stub_everything 'response'
+ @deferred.response = @response
+ @deferred.stubs(:start_request)
+ end
+
+ it "should start the network request" do
+ @deferred.expects(:start_request)
+ @deferred.length
+ end
+
+ it "should return the response content length" do
+ @response.expects(:content_length).returns 10
+ @deferred.length.should == 10
+ end
+ end
+
+ describe "when streaming" do
+ before(:each) do
+ @deferred.stubs(:start_request)
+ end
+
+ it "should start the network request" do
+ @deferred.expects(:start_request)
+ @deferred.stream
+ end
+
+ it "should fetch one chunk from the chunk queue" do
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+ @deferred.stream { |c| }
+ end
+
+ it "should fetch all chunks from the chunk queue until nil" do
+ @chunk_queue.expects(:pop).times(3).returns("chunk1", "chunk2", nil)
+ @deferred.stream { |c|
+ c.should match /^chunk\d/
+ }
+ end
+
+ it "should give the chunk to the current checksum stream" do
+ checksum = stub_everything 'checksum'
+ @deferred.checksum = checksum
+
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+ checksum.expects(:update).with("chunk")
+
+ @deferred.stream { |c| }
+ end
+
+ it "should yield the chunks to the given block" do
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+
+ @deferred.stream { |c|
+ c.should == "chunk"
+ }
+ end
+
+ it "should return the checksum" do
+ checksum = stub_everything 'checksum'
+ @deferred.checksum = checksum
+
+ @deferred.stream.should == checksum
+ end
+ end
+
+ describe "when issueing the network request" do
+ before(:each) do
+ Thread.stubs(:new).yields(nil)
+ @stream = stub_everything 'stream'
+ @content = stub_everything 'content', :content => @stream
+ @response = stub_everything 'response'
+ @content.stubs(:response).returns(@response)
+ end
+
+ it "should return early if the request has already been started" do
+ @deferred.expects(:request_started?).returns(true)
+ @rest.expects(:network).never
+ @deferred.start_request
+ end
+
+ it "should launch the request in a new Thread" do
+ Thread.expects(:new).yields
+ @rest.expects(:network).with(@request).returns(@http)
+ @http.expects(:request_get).with(@uri, @headers)
+ @deferred.start_request
+ end
+
+ describe "and the network thread" do
+ before(:each) do
+ Thread.expects(:new).yields
+ @rest.stubs(:network).with(@request).returns(@http)
+ @rest.stubs(:deserialize).returns(@content)
+ @http.stubs(:request_get).with(@uri, @headers).yields(@response)
+ @deferred.stubs(:stream_response)
+ end
+
+ it "should let everyone know the request has started" do
+ @deferred.start_request
+ @deferred.request_started?.should be true
+ end
+
+ it "should signal other threads that the response is ready" do
+ @got_response.expects(:signal)
+ @deferred.start_request
+ end
+
+ it "should store the current response" do
+ @deferred.start_request
+ @deferred.response.should == @response
+ end
+
+ it "should deserialize the response" do
+ @rest.expects(:deserialize).with(@response).returns(@content)
+ @deferred.start_request
+ end
+
+ it "should stream the response content" do
+ @deferred.expects(:stream_response).with(@stream)
+ @deferred.start_request
+ end
+ end
+
+ describe "and the other thread" do
+ it "should finally wait the request to be started" do
+ @got_response.expects(:wait)
+ @deferred.start_request
+ end
+ end
+
+ describe "when streaming the response content" do
+ it "should put each chunk in the chunk queue" do
+ @response.expects(:read_body).multiple_yields("chunk1","chunk2")
+ @chunk_queue.expects(:<<).with("chunk1").then.with("chunk2")
+ @deferred.stream_response(@content)
+ end
+
+ it "should enqueue nil at the end" do
+ @chunk_queue.expects(:<<).with(nil)
+ @deferred.stream_response(@content)
+ end
+ end
+ end
+
+end
\ No newline at end of file
diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb
index 208c6f4..c1f74f9 100755
--- a/spec/unit/network/formats.rb
+++ b/spec/unit/network/formats.rb
@@ -280,6 +280,10 @@ describe "Puppet Network Format" do
@format.should be_supported(String)
end
+ it "should always support streaming" do
+ @format.should be_support_stream
+ end
+
it "should fail if its multiple_render method is used" do
lambda { @format.render_multiple("foo") }.should raise_error(NotImplementedError)
end
diff --git a/spec/unit/type/file.rb b/spec/unit/type/file.rb
index 1b3fe6a..49c1d34 100755
--- a/spec/unit/type/file.rb
+++ b/spec/unit/type/file.rb
@@ -774,5 +774,37 @@ describe Puppet::Type.type(:file) do
lambda { file.write("something", :content) }.should raise_error(Puppet::Error)
end
+
+ it "should stream streamable content" do
+ file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet")
+ f = stub_everything 'file'
+ File.stubs(:open).yields(f)
+ File.stubs(:rename)
+ content = stub_everything 'content'
+
+ content.expects(:stream).yields("chunk")
+ f.expects(:print).with("chunk")
+
+ file.write(content, :content)
+ end
+
+ it "should use the streamed checksum" do
+ file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet")
+ f = stub_everything 'file'
+ File.stubs(:open).yields(f)
+ File.stubs(:rename)
+ file.stubs(:validate_checksum?).returns(true)
+ file.stubs(:fail_if_checksum_is_wrong)
+ content = stub_everything 'content'
+ property = stub_everything 'property'
+ file.stubs(:property).returns property
+ property.stubs(:checktype).returns(:md5)
+ checksum = stub_everything 'checksum', :checksum => "DEADBEEF"
+ content.stubs(:stream).returns(checksum)
+
+ file.expects(:setchecksum).with("{md5}DEADBEEF")
+
+ file.write(content, :content)
--
You received this message because you are subscribed to the Google Groups "Puppet Developers" group.
To post to this group, send email to puppe...@googlegroups.com.
To unsubscribe from this group, send email to puppet-dev+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.
I don't see how the following could work:
def find(request)
- return nil unless result =
deserialize(network(request).get(indirection2uri(request), headers))
+ return nil unless result =
network(request).request_get(indirection2uri(request), headers) do
|response|
+ return deserialize(response)
+ end
result.name = request.key
result
end
Specifically, if the block containing "return deserialize(response)"
is called/yielded "live" I believe it should exit from the find method
without the result name being set to the result key, whereas if it is
stashed and called later (after find has returned) it should produce a
LocalJumpError.
If this code works, I'm obviously missing something. Any idea what?
-- Markus
What I understood is that returning a value in this block would end up
in "result". And indeed I was able to deserialize catalogs, certificates
and such, so I suppose it is working fine.
> Specifically, if the block containing "return deserialize(response)"
> is called/yielded "live" I believe it should exit from the find method
> without the result name being set to the result key, whereas if it is
> stashed and called later (after find has returned) it should produce a
> LocalJumpError.
To me it would return from the method that yields it (ie the
request_get), but that's a wild guess.
> If this code works, I'm obviously missing something. Any idea what?
Your ruby is way better than mine, so I'd tend to trust what you say :-)
So you would write the code like this (sorry for the wrapping)?
network(request).request_get(indirection2uri(request), headers) do
|response|
result = deserialize(response)
end
return nil unless result
result.name = request.key
result
If that's preferrable, this is an easy change.
--
Brice Figureau
My Blog: http://www.masterzen.fr/
So my code is incorrect because it would strip the end of the
request_get which certainly closes/finishes network things.
Your ruby is way better than mine, so I'd tend to trust what you say :-)
Obviously the response gets deserialized, so I'd say it works.
It's just that, based on your sample, it seem I'm short-circuiting the
Net::HTTP "request shutdown", which is certainly bad :-)
I'll fix this soon in my repository,
I rewrote this part as I sent earlier and indeed this still works,
except it wasn't doing the result.name= stuff.
It allowed me to discover that this result.name= was failing hard when
trying to fetch file metadata (those have no "name"). This is also fixed
in the new patch revision.
[...]
Hi Brice (and Markus, I guess),
What's the status of this patch series? I'd love to get this into
rowlf if at all possible, and I'll work with you to get it done if you
need some backup on it.
--
I respect faith, but doubt is what gets you an education.
-- Wilson Mizner
---------------------------------------------------------------------
Luke Kanies -|- http://reductivelabs.com -|- +1(615)594-8199