[PATCH/puppet 0/5] Puppetd streaming patch

136 views
Skip to first unread message

Brice Figureau

unread,
Feb 7, 2010, 1:24:48 PM2/7/10
to puppe...@googlegroups.com
Hi,

Following one of my blog post[1] about Puppetd memory consumption, I
decided to also try to add my stone to the edifice.

Since #2892 is almost fixed by now, I decided to spend my time on
something else:
1) json catalog (and its deserialization)
2) sourced file content

Those two things can consume memory for a short time (memory which unfortunately
is always consumed, see my blog post for some explanations).

So this patch brings to puppet:

1) support for a new Json parser: Yajl-ruby[1], which is faster and uses
less memory than the regular json parser. Yajl-ruby is available as a gem.
To activate the support it is needed to use --preferred_serialization_format=yajl

2) use Yajl-ruby in "streaming" mode: puppetd never reads the whole response
body in ram, but instead feeds Yajl-ruby chunk by chunk. This way, the json
serialized catalog is never fully stored in RAM.

3) allow the file resource source system to stream file content to disk instead
of reading the whole response body in RAM and then dumping it on file.

This patch has been lightly tested and seems to work, YMMV, though. I also
tried for the patch to be as minimal as possible.

I took some figures while puppetd was sourcing a 100MiB file:

Start Peak
0.25.4 VSZ 612860 925772
patched VSZ 611936 644104

0.25.4 RES 12988 298104
patched RES 11704 43536

Which I think speaks for itself :-)

Now to be clear: this patch is a quick and dirty hack, I don't expect it to be
merged as is, so I'm waiting reviews, tests, comments, flames. I even think
this patch brings a lot whole new serie of issues and bugs :-)

Note: this patch introduces a new issue, regarding #2892, since we don't have
anymore the whole serialized catalog in memory, it will be hard dump it as is
to disk for the local cache. Something clever will need to be done (like streaming
the cache...).

Thanks,
Brice

[1]:
http://www.masterzen.fr/2010/01/28/puppet-memory-usage-not-a-fatality/
[2]:
http://github.com/brianmario/yajl-ruby

Brice Figureau (5):
Add stream format capability
Add Yajl-ruby json parser format
Add a way to capture checksums on a "stream"
Let the rest client know how to handle stream response
Allows puppet:// content client streaming

lib/puppet/feature/yajl.rb | 24 ++++
lib/puppet/file_serving/content_stream.rb | 23 +++
lib/puppet/indirector/file_content/rest.rb | 10 ++-
lib/puppet/indirector/rest.rb | 14 ++-
lib/puppet/network/deferred_response.rb | 94 +++++++++++++
lib/puppet/network/format.rb | 4 +
lib/puppet/network/format_handler.rb | 23 +++-
lib/puppet/network/formats.rb | 112 +++++++++++++++
lib/puppet/network/response_stream.rb | 45 ++++++
lib/puppet/resource.rb | 2 +-
lib/puppet/type/file.rb | 19 +++-
lib/puppet/util/checksum_stream.rb | 20 +++
lib/puppet/util/checksums.rb | 12 ++
spec/integration/network/deferred_response.rb | 34 +++++
spec/integration/network/formats.rb | 82 +++++++++++-
spec/unit/file_serving/content_stream.rb | 29 ++++
spec/unit/indirector/file_content/rest.rb | 30 ++++-
spec/unit/indirector/rest.rb | 45 ++++---
spec/unit/network/deferred_response.rb | 182 +++++++++++++++++++++++++
spec/unit/network/format.rb | 4 +
spec/unit/network/format_handler.rb | 33 +++++
spec/unit/network/formats.rb | 110 +++++++++++++++-
spec/unit/network/response_stream.rb | 60 ++++++++
spec/unit/type/file.rb | 32 +++++
spec/unit/util/checksum_stream.rb | 23 +++
spec/unit/util/checksums.rb | 12 ++
26 files changed, 1040 insertions(+), 38 deletions(-)
create mode 100644 lib/puppet/feature/yajl.rb
create mode 100644 lib/puppet/file_serving/content_stream.rb
create mode 100644 lib/puppet/network/deferred_response.rb
create mode 100644 lib/puppet/network/response_stream.rb
create mode 100644 lib/puppet/util/checksum_stream.rb
create mode 100644 spec/integration/network/deferred_response.rb
create mode 100644 spec/unit/file_serving/content_stream.rb
create mode 100644 spec/unit/network/deferred_response.rb
create mode 100644 spec/unit/network/response_stream.rb
create mode 100644 spec/unit/util/checksum_stream.rb

Brice Figureau

unread,
Feb 7, 2010, 1:24:51 PM2/7/10
to puppe...@googlegroups.com
Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---
lib/puppet/util/checksum_stream.rb | 20 ++++++++++++++++++++
lib/puppet/util/checksums.rb | 12 ++++++++++++
spec/unit/util/checksum_stream.rb | 23 +++++++++++++++++++++++
spec/unit/util/checksums.rb | 12 ++++++++++++
4 files changed, 67 insertions(+), 0 deletions(-)
create mode 100644 lib/puppet/util/checksum_stream.rb
create mode 100644 spec/unit/util/checksum_stream.rb

diff --git a/lib/puppet/util/checksum_stream.rb b/lib/puppet/util/checksum_stream.rb
new file mode 100644
index 0000000..0add5fd
--- /dev/null
+++ b/lib/puppet/util/checksum_stream.rb
@@ -0,0 +1,20 @@
+
+class Puppet::Util::ChecksumStream
+ attr_accessor :digest
+
+ def initialize(digest)
+ @digest = digest.reset
+ end
+
+ def update(chunk)
+ digest << chunk
+ end
+
+ def checksum
+ digest.hexdigest
+ end
+
+ def to_s
+ "checksum: #{digest}"
+ end
+end
\ No newline at end of file
diff --git a/lib/puppet/util/checksums.rb b/lib/puppet/util/checksums.rb
index 98bf5de..1b73a86 100644
--- a/lib/puppet/util/checksums.rb
+++ b/lib/puppet/util/checksums.rb
@@ -1,3 +1,5 @@
+require 'puppet/util/checksum_stream'
+
# A stand-alone module for calculating checksums
# in a generic way.
module Puppet::Util::Checksums
@@ -34,6 +36,11 @@ module Puppet::Util::Checksums
md5_file(filename, true)
end

+ def md5_stream
+ require 'digest/md5'
+ Puppet::Util::ChecksumStream.new(Digest::MD5.new())
+ end
+
# Return the :mtime timestamp of a file.
def mtime_file(filename)
File.stat(filename).send(:mtime)
@@ -63,6 +70,11 @@ module Puppet::Util::Checksums
sha1_file(filename, true)
end

+ def sha1_stream
+ require 'digest/sha1'
+ Puppet::Util::ChecksumStream.new(Digest::SHA1.new())
+ end
+
# Return the :ctime of a file.
def ctime_file(filename)
File.stat(filename).send(:ctime)
diff --git a/spec/unit/util/checksum_stream.rb b/spec/unit/util/checksum_stream.rb
new file mode 100644
index 0000000..0ace637
--- /dev/null
+++ b/spec/unit/util/checksum_stream.rb
@@ -0,0 +1,23 @@
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+
+require 'puppet/util/checksum_stream'
+
+describe Puppet::Util::ChecksumStream do
+
+ before(:each) do
+ @digest = stub 'digest'
+ @digest.stubs(:reset).returns(@digest)
+ @sum = Puppet::Util::ChecksumStream.new(@digest)
+ end
+
+ it "should add to the digest when update" do
+ @digest.expects(:<<).with("content")
+ @sum.update("content")
+ end
+
+ it "should produce the final checksum" do
+ @digest.expects(:hexdigest).returns("DEADBEEF")
+ @sum.checksum.should == "DEADBEEF"
+ end
+end
\ No newline at end of file
diff --git a/spec/unit/util/checksums.rb b/spec/unit/util/checksums.rb
index d31d7a0..a27229e 100755
--- a/spec/unit/util/checksums.rb
+++ b/spec/unit/util/checksums.rb
@@ -15,6 +15,7 @@ describe Puppet::Util::Checksums do

content_sums = [:md5, :md5lite, :sha1, :sha1lite]
file_only = [:ctime, :mtime]
+ stream = [:md5_stream, :sha1_stream]

content_sums.each do |sumtype|
it "should be able to calculate %s sums from strings" % sumtype do
@@ -28,6 +29,12 @@ describe Puppet::Util::Checksums do
end
end

+ stream.each do |sumtype|
+ it "should be able to calculate %s sums from streams" % sumtype do
+ @summer.should be_respond_to(sumtype.to_s)
+ end
+ end
+
it "should have a method for stripping a sum type from an existing checksum" do
@summer.sumtype("{md5}asdfasdfa").should == "md5"
end
@@ -62,6 +69,11 @@ describe Puppet::Util::Checksums do

@summer.send(sum.to_s + "_file", file).should == :mydigest
end
+
+ it "should use #{klass} to seed checksum stream" do
+ @summer.send("#{sum}_stream").should be_instance_of(Puppet::Util::ChecksumStream)
+ @summer.send("#{sum}_stream").digest.should be_instance_of(klass)
+ end
end
end

--
1.6.6.1

Brice Figureau

unread,
Feb 7, 2010, 1:24:52 PM2/7/10
to puppe...@googlegroups.com
This commit allows streamed deserialization if the formats supports
streaming.

Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---

lib/puppet/indirector/rest.rb | 14 +++++---
lib/puppet/network/response_stream.rb | 45 ++++++++++++++++++++++++
spec/unit/indirector/rest.rb | 45 +++++++++++++++----------
spec/unit/network/response_stream.rb | 60 +++++++++++++++++++++++++++++++++
4 files changed, 141 insertions(+), 23 deletions(-)
create mode 100644 lib/puppet/network/response_stream.rb
create mode 100644 spec/unit/network/response_stream.rb

diff --git a/lib/puppet/indirector/rest.rb b/lib/puppet/indirector/rest.rb
index a89e986..d3fb5ae 100644
--- a/lib/puppet/indirector/rest.rb
+++ b/lib/puppet/indirector/rest.rb
@@ -3,6 +3,7 @@ require 'uri'

require 'puppet/network/http_pool'
require 'puppet/network/http/api/v1'
+require 'puppet/network/response_stream'

# Access objects via REST
class Puppet::Indirector::REST < Puppet::Indirector::Terminus
@@ -44,10 +45,11 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus
content_type = response['content-type'].gsub(/\s*;.*$/,'') # strip any appended charset

# Convert the response to a deserialized object.
+ response = Puppet::Network::ResponseStream.new(response)
if multiple
- model.convert_from_multiple(content_type, response.body)
+ model.convert_from_multiple(content_type, response)
else
- model.convert_from(content_type, response.body)
+ model.convert_from(content_type, response)
end
else
# Raise the http error if we didn't get a 'success' of some kind.
@@ -66,14 +68,16 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus
end

def find(request)
- return nil unless result = deserialize(network(request).get(indirection2uri(request), headers))
+ return nil unless result = network(request).request_get(indirection2uri(request), headers) do |response|
+ return deserialize(response)
+ end
result.name = request.key
result
end

def search(request)
- unless result = deserialize(network(request).get(indirection2uri(request), headers), true)
- return []
+ return [] unless result = network(request).request_get(indirection2uri(request), headers) do |response|
+ return deserialize(response, true)
end
return result
end
diff --git a/lib/puppet/network/response_stream.rb b/lib/puppet/network/response_stream.rb
new file mode 100644
index 0000000..4b7c772
--- /dev/null
+++ b/lib/puppet/network/response_stream.rb
@@ -0,0 +1,45 @@
+
+require 'net/http'
+
+# This is a wrapper around either a Net::HTTPResponse or a String
+# allowing the same interface for the exterior world.
+class Puppet::Network::ResponseStream
+
+ attr_accessor :response
+
+ [:code, :body].each do |m|
+ define_method(m) do
+ response.send(:m)


+ end
+ end
+

+ def initialize(content)
+ @response = content
+ end
+
+ def stream?
+ response.is_a?(Net::HTTPResponse)
+ end
+
+ def content
+ response.body
+ end
+
+ def length
+ if stream?
+ response.content_length
+ else
+ response.length


+ end
+ end
+

+ def stream
+ if stream?
+ response.read_body do |r|
+ yield r
+ end
+ else
+ yield response.body
+ end


+ end
+end
\ No newline at end of file

diff --git a/spec/unit/indirector/rest.rb b/spec/unit/indirector/rest.rb
index d12e3c6..fb6c03c 100755
--- a/spec/unit/indirector/rest.rb
+++ b/spec/unit/indirector/rest.rb
@@ -123,35 +123,42 @@ describe Puppet::Indirector::REST do
}

it "should return the results of converting from the format specified by the content-type header if the response code is in the 200s" do
- @model.expects(:convert_from).with("myformat", "mydata").returns "myobject"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "myformat"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"
+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from).with("myformat", stream).returns "myobject"

@searcher.deserialize(response).should == "myobject"
end

it "should convert and return multiple instances if the return code is in the 200s and 'multiple' is specified" do
- @model.expects(:convert_from_multiple).with("myformat", "mydata").returns "myobjects"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "myformat"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"
+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from_multiple).with("myformat", stream).returns "myobjects"

@searcher.deserialize(response, true).should == "myobjects"
end

it "should strip the content-type header to keep only the mime-type" do
- @model.expects(:convert_from).with("text/plain", "mydata").returns "myobject"
-
response = mock 'response'
response.stubs(:[]).with("content-type").returns "text/plain; charset=utf-8"
response.stubs(:body).returns "mydata"
response.stubs(:code).returns "200"

+ stream = stub 'stream'
+ Puppet::Network::ResponseStream.expects(:new).with(response).returns(stream)
+
+ @model.expects(:convert_from).with("text/plain", stream).returns "myobject"
+
@searcher.deserialize(response)
end
end
@@ -186,7 +193,8 @@ describe Puppet::Indirector::REST do

describe "when doing a find" do
before :each do
- @connection = stub('mock http connection', :get => @response)
+ @connection = stub('mock http connection')
+ @connection.stubs(:request_get).yields(@response)
@searcher.stubs(:network).returns(@connection) # neuter the network connection

# Use a key with spaces, so we can test escaping
@@ -195,27 +203,27 @@ describe Puppet::Indirector::REST do

it "should call the GET http method on a network connection" do
@searcher.expects(:network).returns @connection
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields @response
@searcher.find(@request)
end

it "should deserialize and return the http response" do
- @connection.expects(:get).returns @response

instance = stub 'object', :name= => nil
@searcher.expects(:deserialize).with(@response).returns instance
+ @connection.expects(:request_get).yields(@response).returns(instance)

@searcher.find(@request).should == instance
end

it "should use the URI generated by the Handler module" do
@searcher.expects(:indirection2uri).with(@request).returns "/my/uri"
- @connection.expects(:get).with { |path, args| path == "/my/uri" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| path == "/my/uri" }.yields(@response)
@searcher.find(@request)
end

it "should provide an Accept header containing the list of supported formats joined with commas" do
- @connection.expects(:get).with { |path, args| args["Accept"] == "supported, formats" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| args["Accept"] == "supported, formats" }.yields(@response)

@searcher.model.expects(:supported_formats).returns %w{supported formats}
@searcher.find(@request)
@@ -227,7 +235,7 @@ describe Puppet::Indirector::REST do
end

it "should set the name of the resulting instance to the asked-for name" do
- @searcher.expects(:deserialize).with(@response).returns @instance
+ @connection.expects(:request_get).returns @instance
@instance.expects(:name=).with "foo bar"
@searcher.find(@request)
end
@@ -240,7 +248,8 @@ describe Puppet::Indirector::REST do

describe "when doing a search" do
before :each do
- @connection = stub('mock http connection', :get => @response)
+ @connection = stub('mock http connection')
+ @connection.stubs(:request_get).yields(@response)
@searcher.stubs(:network).returns(@connection) # neuter the network connection

@model.stubs(:convert_from_multiple)
@@ -250,12 +259,12 @@ describe Puppet::Indirector::REST do

it "should call the GET http method on a network connection" do
@searcher.expects(:network).returns @connection
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields @response
@searcher.search(@request)
end

it "should deserialize as multiple instances and return the http response" do
- @connection.expects(:get).returns @response
+ @connection.expects(:request_get).yields(@response).returns "myobject"
@searcher.expects(:deserialize).with(@response, true).returns "myobject"

@searcher.search(@request).should == 'myobject'
@@ -263,19 +272,19 @@ describe Puppet::Indirector::REST do

it "should use the URI generated by the Handler module" do
@searcher.expects(:indirection2uri).with(@request).returns "/mys/uri"
- @connection.expects(:get).with { |path, args| path == "/mys/uri" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| path == "/mys/uri" }.yields(@response)
@searcher.search(@request)
end

it "should provide an Accept header containing the list of supported formats joined with commas" do
- @connection.expects(:get).with { |path, args| args["Accept"] == "supported, formats" }.returns(@response)
+ @connection.expects(:request_get).with { |path, args| args["Accept"] == "supported, formats" }.yields(@response)

@searcher.model.expects(:supported_formats).returns %w{supported formats}
@searcher.search(@request)
end

it "should return an empty array if serialization returns nil" do
- @model.stubs(:convert_from_multiple).returns nil
+ @connection.expects(:request_get).returns nil

@searcher.search(@request).should == []
end
diff --git a/spec/unit/network/response_stream.rb b/spec/unit/network/response_stream.rb
new file mode 100644
index 0000000..08b23b5
--- /dev/null
+++ b/spec/unit/network/response_stream.rb
@@ -0,0 +1,60 @@
+#!/usr/bin/env ruby


+
+require File.dirname(__FILE__) + '/../../spec_helper'
+

+require 'puppet/network/response_stream'
+
+describe Puppet::Network::ResponseStream do
+ before(:each) do
+ @content = stub_everything 'content'
+ @stream = Puppet::Network::ResponseStream.new(@content)
+ end
+
+ it "should identify itself as a stream if the underlying content is an http response" do
+ @content.expects(:is_a?).with(Net::HTTPResponse).returns(true)
+ @stream.should be_stream
+ end
+
+ it "should not identify itself as a stream if the underlying content is not an http response" do
+ @content.expects(:is_a?).with(Net::HTTPResponse).returns(false)
+ @stream.should_not be_stream
+ end
+
+ it "should be able to return content length" do
+ @stream.should respond_to(:length)
+ end
+
+ it "should be able to stream content" do
+ @stream.should respond_to(:stream)
+ end
+
+ describe "when asking for content length" do
+ it "should return the content-length header if it is a stream" do
+ @stream.stubs(:stream?).returns(true)
+ @content.expects(:content_length).returns 10
+ @stream.length.should == 10
+ end
+
+ it "should return the string length otherwise" do
+ @content.expects(:length).returns 10
+ @stream.length.should == 10


+ end
+ end
+

+ describe "when streaming" do
+ it "should yield the block to the response read_body method if it is a stream" do
+ @stream.stubs(:stream?).returns(true)
+ @content.expects(:read_body).yields("chunk")
+ @stream.stream do |chunk|
+ chunk.should == "chunk"


+ end
+ end
+

+ it "should yield the full body if it is not a stream" do
+ @content.expects(:body).returns("body")
+ @stream.stream do |chunk|
+ chunk.should == "body"
+ end
+ end


+ end
+end
\ No newline at end of file

--
1.6.6.1

Brice Figureau

unread,
Feb 7, 2010, 1:24:49 PM2/7/10
to puppe...@googlegroups.com
Formats can now say they support streaming (ie unformatting by
chunk).

Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---

lib/puppet/network/format.rb | 4 ++++
lib/puppet/network/format_handler.rb | 23 ++++++++++++++++++++---
spec/unit/network/format.rb | 4 ++++
spec/unit/network/format_handler.rb | 33 +++++++++++++++++++++++++++++++++
4 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/lib/puppet/network/format.rb b/lib/puppet/network/format.rb
index d781242..b568aa7 100644
--- a/lib/puppet/network/format.rb
+++ b/lib/puppet/network/format.rb
@@ -89,6 +89,10 @@ class Puppet::Network::Format
suitable? and required_methods_present?(klass)
end

+ def support_stream?
+ false
+ end
+
def to_s
"Puppet::Network::Format[%s]" % name
end
diff --git a/lib/puppet/network/format_handler.rb b/lib/puppet/network/format_handler.rb
index ea8cf35..1435b99 100644
--- a/lib/puppet/network/format_handler.rb
+++ b/lib/puppet/network/format_handler.rb
@@ -23,7 +23,7 @@ module Puppet::Network::FormatHandler
@format = format
end

- [:intern, :intern_multiple, :render, :render_multiple, :mime].each do |method|
+ [:intern, :intern_multiple, :render, :render_multiple, :mime, :support_stream?].each do |method|
define_method(method) do |*args|
protect(method, args)
end
@@ -93,12 +93,21 @@ module Puppet::Network::FormatHandler
Puppet::Network::FormatHandler
end

+ def decapsulate(format, data)
+ format = format_handler.protected_format(format)
+
+ data = data.content if !format.support_stream? and data.respond_to?(:stream?) and data.stream?
+ [format, data]
+ end
+
def convert_from(format, data)
- format_handler.protected_format(format).intern(self, data)
+ format, data = decapsulate(format, data)
+ format.intern(self, data)
end

def convert_from_multiple(format, data)
- format_handler.protected_format(format).intern_multiple(self, data)
+ format, data = decapsulate(format, data)
+ format.intern_multiple(self, data)
end

def render_multiple(format, instances)
@@ -113,6 +122,10 @@ module Puppet::Network::FormatHandler
Puppet::Network::FormatHandler.format(name).supported?(self)
end

+ def support_stream?(name)
+ Puppet::Network::FormatHandler.format(name).support_stream?
+ end
+
def supported_formats
result = format_handler.formats.collect { |f| format_handler.format(f) }.find_all { |f| f.supported?(self) }.collect { |f| f.name }.sort do |a, b|
# It's an inverse sort -- higher weight formats go first.
@@ -164,6 +177,10 @@ module Puppet::Network::FormatHandler
def support_format?(name)
self.class.support_format?(name)
end
+
+ def support_stream?(name)
+ self.class.support_stream?(name)
+ end
end
end

diff --git a/spec/unit/network/format.rb b/spec/unit/network/format.rb
index e5a6b59..feefd68 100755
--- a/spec/unit/network/format.rb
+++ b/spec/unit/network/format.rb
@@ -66,6 +66,10 @@ describe Puppet::Network::Format do
@format.should respond_to(:supported?)
end

+ it "should be able to tell it supports streaming" do
+ @format.should respond_to(:support_stream?)
+ end
+
it "should consider a class to be supported if it has the individual and multiple methods for rendering and interning" do
@format.should be_supported(FormatRenderer)
end
diff --git a/spec/unit/network/format_handler.rb b/spec/unit/network/format_handler.rb
index 110effe..a5a68ea 100755
--- a/spec/unit/network/format_handler.rb
+++ b/spec/unit/network/format_handler.rb
@@ -117,6 +117,7 @@ describe Puppet::Network::FormatHandler do
@format = mock 'format'
@format.stubs(:supported?).returns true
@format.stubs(:name).returns :my_format
+ @format.stubs(:support_stream?).returns false
Puppet::Network::FormatHandler.stubs(:format).with(:my_format).returns @format
Puppet::Network::FormatHandler.stubs(:mime).with("text/myformat").returns @format
Puppet::Network::Format.stubs(:===).returns false
@@ -132,6 +133,15 @@ describe Puppet::Network::FormatHandler do
FormatTester.support_format?(:my_format)
end

+ it "should be able to test whether a format supports streaming" do
+ FormatTester.should respond_to(:support_stream?)
+ end
+
+ it "should use the Format to determine whether a given format supports streaming" do
+ @format.expects(:support_stream?)
+ FormatTester.support_stream?(:my_format)
+ end
+
it "should be able to convert from a given format" do
FormatTester.should respond_to(:convert_from)
end
@@ -141,6 +151,29 @@ describe Puppet::Network::FormatHandler do
FormatTester.convert_from(:my_format, "mydata")
end

+ it "should not decapsulate the content when converting with a format that supports stream" do
+ data = stub_everything 'data'
+ data.expects(:content).never
+ @format.expects(:support_stream?).returns(true)
+ @format.expects(:intern).with(FormatTester, data)
+ FormatTester.convert_from(:my_format, data)
+ end
+
+ it "should not decapsulate the content when converting streamable data" do
+ data = stub_everything 'data', :stream? => true
+ data.expects(:content).never
+ @format.expects(:support_stream?).returns(true)
+ @format.expects(:intern).with(FormatTester, data)
+ FormatTester.convert_from(:my_format, data)
+ end
+
+ it "should decapsulate the content when converting streamable data with a non streamable format" do
+ data = stub_everything 'data', :stream? => true
+ data.expects(:content).returns("mydata")
+ @format.expects(:intern).with(FormatTester, "mydata")
+ FormatTester.convert_from(:my_format, data)
+ end
+
it "should call the format-specific converter when asked to convert from a given format by mime-type" do
@format.expects(:intern).with(FormatTester, "mydata")
FormatTester.convert_from("text/myformat", "mydata")
--
1.6.6.1

Brice Figureau

unread,
Feb 7, 2010, 1:24:50 PM2/7/10
to puppe...@googlegroups.com
This specific format can unformat/format json in a streaming way.
To activate it: --preferred_serialization_format=yajl

Apparently pson was serializing ruby objects by calling to_s.
Yajl puts its public properties in a hash, since ResourceReference
don't have a specific to_pson_data_hash we force them to serialize
as string, thus enabling the same result as with pson.

Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---

lib/puppet/feature/yajl.rb | 24 ++++++++
lib/puppet/network/formats.rb | 108 +++++++++++++++++++++++++++++++++++
lib/puppet/resource.rb | 2 +-
spec/integration/network/formats.rb | 82 ++++++++++++++++++++++++++-
spec/unit/network/formats.rb | 106 +++++++++++++++++++++++++++++++++-
5 files changed, 317 insertions(+), 5 deletions(-)
create mode 100644 lib/puppet/feature/yajl.rb

diff --git a/lib/puppet/feature/yajl.rb b/lib/puppet/feature/yajl.rb
new file mode 100644
index 0000000..b4d4954
--- /dev/null
+++ b/lib/puppet/feature/yajl.rb
@@ -0,0 +1,24 @@
+require 'puppet/util/feature'
+
+# We want this to load if possible, but it's not automatically
+# required.
+Puppet.features.rubygems?
+Puppet.features.add(:yajl) do
+ found = false
+ begin
+ require 'rubygems'
+ require 'yajl'
+
+ #Yajl::Encoder.enable_json_gem_compatability
+
+ class ::Object
+ def to_pson(*args, &block)
+ "\"#{to_s}\""


+ end
+ end
+

+ found = true
+ rescue LoadError => detail
+ end
+ found
+end
diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb
index a98dcbc..8b9b68d 100644
--- a/lib/puppet/network/formats.rb
+++ b/lib/puppet/network/formats.rb
@@ -186,3 +186,111 @@ Puppet::Network::FormatHandler.create(:pson, :mime => "text/pson", :weight => 10
klass.from_pson(data)
end
end
+
+Puppet::Network::FormatHandler.create(:yajl, :mime => "text/yajl", :weight => 20) do
+ confine :true => Puppet.features.yajl?
+
+ class Puppet::ParsingComplete
+ attr_accessor :result
+
+ def initialize
+ @result = []
+ end
+
+ def complete(object)
+ @result << object


+ end
+ end
+

+ module ::PSON
+ alias :old_parse :parse
+ def parse(string)
+ Yajl::Parser.parse(string)


+ end
+ end
+

+ def parse(content)
+ if content.respond_to?(:stream?)
+ unless content.stream?
+ Yajl::Parser.parse(content.content)
+ else
+ complete = Puppet::ParsingComplete.new
+ parser = Yajl::Parser.new
+ parser.on_parse_complete = complete.method(:complete)
+ content.stream do |r|
+ parser << r
+ end
+ result = complete.result
+ return result.shift if result.size == 1
+ result
+ end
+ else
+ Yajl::Parser.parse(content)


+ end
+ end
+

+ def intern(klass, content)
+ data_to_instance(klass, parse(content))
+ end
+
+ def intern_multiple(klass, content)
+ parse(content).collect do |data|
+ data_to_instance(klass, data)


+ end
+ end
+

+ def render(instance)
+ Yajl::Encoder.encode(instance_to_data(instance))
+ end
+
+ def render_multiple(instances)
+ out = ""
+ encoder = Yajl::Encoder.new
+ instances.collect do |i|
+ out << encoder.encode(instance_to_data(i))
+ end
+ out
+ end
+
+ # supported only for brave souls installing yajl
+ def supported?(klass)
+ Puppet.features.yajl?
+ end
+
+ def support_stream?
+ # of course we do
+ true
+ end
+
+ # If they pass class information, we want to ignore it. By default,
+ # we'll include class information but we won't rely on it - we don't
+ # want class names to be required because we then can't change our
+ # internal class names, which is bad.
+ def data_to_instance(klass, data)
+ if data.is_a?(Hash) and d = data['data']
+ data = d
+ end
+ if data.is_a?(klass)
+ return data
+ end
+ klass.from_pson(data)
+ end
+
+ # recursively call to_pson_data_hash on objects
+ # supporting it
+ def instance_to_data(instance)
+ instance = instance.to_pson_data_hash if instance.respond_to?(:to_pson_data_hash)
+ case instance
+ when Hash
+ instance = instance.inject({}) do |h, (k,v)|
+ h[k] = instance_to_data(v)
+ h
+ end
+ when Array
+ instance.collect! do |i|
+ instance_to_data(i)
+ end
+ end
+ instance
+ end
+end
diff --git a/lib/puppet/resource.rb b/lib/puppet/resource.rb
index 91dd547..2bf99c0 100644
--- a/lib/puppet/resource.rb
+++ b/lib/puppet/resource.rb
@@ -53,7 +53,7 @@ class Puppet::Resource

# Don't duplicate the title as the namevar
next hash if param == namevar and value == title
- hash[param] = value
+ hash[param] = value.is_a?(Puppet::Resource::Reference) ? value.to_s : value
hash
end

diff --git a/spec/integration/network/formats.rb b/spec/integration/network/formats.rb
index 35e7977..6eef347 100755
--- a/spec/integration/network/formats.rb
+++ b/spec/integration/network/formats.rb
@@ -18,11 +18,15 @@ class PsonIntTest
@string = string
end

- def to_pson(*args)
+ def to_pson_data_hash
{
'type' => self.class.name,
'data' => [@string]
- }.to_pson(*args)
+ }
+ end
+
+ def to_pson(*args)
+ to_pson_data_hash.to_pson(*args)
end

def self.canonical_order(s)
@@ -108,3 +112,77 @@ describe Puppet::Network::FormatHandler.format(:pson) do
end
end
end
+
+describe Puppet::Network::FormatHandler.format(:yajl) do
+ describe "when yajl is absent" do
+ confine "'yajl' library is present" => (! Puppet.features.yajl?)
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should not be suitable" do
+ @yajl.should_not be_suitable
+ end
+
+ it "should not be supported" do
+ @yajl.should_not be_supported


+ end
+ end
+

+ describe "when yajl is available" do
+ confine "Missing 'yajl' library" => Puppet.features.yajl?
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should be able to render an instance to json" do
+ instance = PsonIntTest.new("foo")
+ PsonIntTest.canonical_order(@yajl.render(instance)).should == PsonIntTest.canonical_order('{"type":"PsonIntTest","data":["foo"]}' )
+ end
+
+ it "should be able to render arrays to json" do
+ @yajl.render([1,2]).should == '[1,2]'
+ end
+
+ it "should be able to render arrays containing hashes to json" do
+ @yajl.render([{"one"=>1},{"two"=>2}]).should == '[{"one":1},{"two":2}]'
+ end
+
+ it "should be able to render multiple instances to json" do
+ Puppet.features.add(:yajl, :libs => %w{yajl})
+
+ one = PsonIntTest.new("one")
+ two = PsonIntTest.new("two")
+
+ PsonIntTest.canonical_order(@yajl.render([one,two])).should == PsonIntTest.canonical_order('[{"type":"PsonIntTest","data":["one"]},{"type":"PsonIntTest","data":["two"]}]')
+ end
+
+ it "should be able to intern a stream" do
+ content = stub 'stream', :stream? => true
+ content.expects(:stream).multiple_yields('{"type":"PsonIntTest",', '"data":["foo"]}')
+ @yajl.intern(PsonIntTest, content).should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern json into an instance" do
+ @yajl.intern(PsonIntTest, '{"type":"PsonIntTest","data":["foo"]}').should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern json with no class information into an instance" do
+ @yajl.intern(PsonIntTest, '["foo"]').should == PsonIntTest.new("foo")
+ end
+
+ it "should be able to intern multiple instances from json" do
+ @yajl.intern_multiple(PsonIntTest, '[{"type": "PsonIntTest", "data": ["one"]},{"type": "PsonIntTest", "data": ["two"]}]').should == [
+ PsonIntTest.new("one"), PsonIntTest.new("two")
+ ]
+ end
+
+ it "should be able to intern multiple instances from json with no class information" do
+ @yajl.intern_multiple(PsonIntTest, '[["one"],["two"]]').should == [
+ PsonIntTest.new("one"), PsonIntTest.new("two")
+ ]


+ end
+ end
+end

diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb
index a241306..208c6f4 100755
--- a/spec/unit/network/formats.rb
+++ b/spec/unit/network/formats.rb
@@ -18,11 +18,15 @@ class PsonTest
@string = string
end

- def to_pson(*args)
+ def to_pson_data_hash
{
'type' => self.class.name,
'data' => @string
- }.to_pson(*args)
+ }
+ end
+
+ def to_pson(*args)
+ to_pson_data_hash.to_pson(*args)
end
end

@@ -362,4 +366,102 @@ describe "Puppet Network Format" do
end
end
end
+
+
+ it "should include a yajl format" do
+ Puppet::Network::FormatHandler.format(:yajl).should_not be_nil
+ end
+
+ describe "yajl" do
+ confine "Missing 'yajl' library" => Puppet.features.yajl?
+
+ before do
+ @yajl = Puppet::Network::FormatHandler.format(:yajl)
+ end
+
+ it "should have its mime type set to text/yajl" do
+ Puppet::Network::FormatHandler.format(:yajl).mime.should == "text/yajl"
+ end
+
+ it "should require the :render_method" do
+ Puppet::Network::FormatHandler.format(:yajl).required_methods.should be_include(:render_method)
+ end
+
+ it "should require the :intern_method" do
+ Puppet::Network::FormatHandler.format(:yajl).required_methods.should be_include(:intern_method)
+ end
+
+ it "should have a weight of 20" do
+ @yajl.weight.should == 20
+ end
+
+ describe "when supported" do
+ it "should render by calling 'to_pson_data_hash' on the instance" do
+ instance = PsonTest.new("foo")
+ instance.expects(:to_pson_data_hash).returns "foo"
+ @yajl.render(instance).should == "\"foo\""
+ end
+
+ it "should render multiple instances by calling 'to_pson_data_hash' on each element array" do
+ instance = mock "instance"
+ instances = [instance]
+
+ instance.expects(:to_pson_data_hash).returns "foo"
+
+ @yajl.render_multiple(instances).should == "\"foo\""
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' on the text and then using from_pson to convert the data into an instance" do
+ content = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ PsonTest.expects(:from_pson).with("foo").returns "parsed_yajl"
+ @yajl.intern(PsonTest, content).should == "parsed_yajl"
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' in stream mode if content is streamable" do
+ content = stub 'foo', :stream? => true
+ content.expects(:stream).yields "foo"
+ parser = stub_everything 'parser'
+ Yajl::Parser.expects(:new).returns(parser)
+ parser.expects(:<<).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ @yajl.intern(PsonTest, content)
+ end
+
+ it "should return parsed objects in stream mode" do
+ content = stub 'foo', :stream? => true
+ content.stubs(:stream).yields "foo"
+ parsing_complete = stub 'parsing_complete'
+ parsing_complete.expects(:method)
+ Puppet::ParsingComplete.expects(:new).returns(parsing_complete)
+ parser = stub_everything 'parser'
+ Yajl::Parser.expects(:new).returns(parser)
+ parser.expects(:<<).with("foo").returns("type" => "PsonTest", "data" => "foo")
+ parsing_complete.expects(:result).returns([])
+ @yajl.intern(PsonTest, content)
+ end
+
+ it "should not render twice if 'Yajl::Parser.parse' creates the appropriate instance" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ instance = PsonTest.new("foo")
+ Yajl::Parser.expects(:parse).with("foo").returns(instance)
+ PsonTest.expects(:from_pson).never
+ @yajl.intern(PsonTest, text).should equal(instance)
+ end
+
+ it "should intern by calling 'Yajl::Parser.parse' on the text and then using from_pson to convert the actual into an instance if the yajl has no class/data separation" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns("foo")
+ PsonTest.expects(:from_pson).with("foo").returns "parsed_yajl"
+ @yajl.intern(PsonTest, text).should == "parsed_yajl"
+ end
+
+ it "should intern multiples by parsing the text and using 'class.intern' on each resulting data structure" do
+ text = stub 'foo', :stream? => false, :content => "foo"
+ Yajl::Parser.expects(:parse).with("foo").returns ["bar", "baz"]
+ PsonTest.expects(:from_pson).with("bar").returns "BAR"
+ PsonTest.expects(:from_pson).with("baz").returns "BAZ"
+ @yajl.intern_multiple(PsonTest, text).should == %w{BAR BAZ}


+ end
+ end
+ end

end
--
1.6.6.1

Brice Figureau

unread,
Feb 7, 2010, 1:24:53 PM2/7/10
to puppe...@googlegroups.com
When a file source property requests a file content, the request is
routed (indirected) to P::I::FileContent::Rest.
This class instead of relying on the general Rest subsystem to get a
raw response containing the full file content, creates a special
model using a DeferredResponse.
Later when the file type wants to write the file on disk, it "streams"
it through the DeferredResponse.
The DeferredResponse lazily executes the network requests. But due
to the poor Net::HTTP implementation, we're forced to use a new Thread
to do the streaming. Each read chunk is passed in a Thread SizedQueue
which is then fetch by the main tread doing the on disk store.
The new checksum is computed at the same time chunk by chunk.

Signed-off-by: Brice Figureau <brice-...@daysofwonder.com>
---

lib/puppet/file_serving/content_stream.rb | 23 +++
lib/puppet/indirector/file_content/rest.rb | 10 ++-

lib/puppet/network/deferred_response.rb | 94 +++++++++++++
lib/puppet/network/formats.rb | 4 +
lib/puppet/type/file.rb | 19 +++-
spec/integration/network/deferred_response.rb | 34 +++++


spec/unit/file_serving/content_stream.rb | 29 ++++
spec/unit/indirector/file_content/rest.rb | 30 ++++-

spec/unit/network/deferred_response.rb | 182 +++++++++++++++++++++++++
spec/unit/network/formats.rb | 4 +
spec/unit/type/file.rb | 32 +++++
11 files changed, 454 insertions(+), 7 deletions(-)


create mode 100644 lib/puppet/file_serving/content_stream.rb
create mode 100644 lib/puppet/network/deferred_response.rb

create mode 100644 spec/integration/network/deferred_response.rb
create mode 100644 spec/unit/file_serving/content_stream.rb
create mode 100644 spec/unit/network/deferred_response.rb

diff --git a/lib/puppet/file_serving/content_stream.rb b/lib/puppet/file_serving/content_stream.rb
new file mode 100644
index 0000000..2f0b1a7
--- /dev/null
+++ b/lib/puppet/file_serving/content_stream.rb
@@ -0,0 +1,23 @@
+#
+# Created by Luke Kanies on 2007-10-16.
+# Copyright (c) 2007. All rights reserved.
+
+require 'puppet/indirector'
+require 'puppet/file_serving'
+require 'puppet/file_serving/content'
+
+# A class that handles retrieving file contents.
+# It only reads the file when its content is specifically
+# asked for.
+class Puppet::FileServing::ContentStream < Puppet::FileServing::Content
+
+ def self.create(content)
+ instance = new("/this/is/a/fake/path")
+ instance.content = content
+ instance
+ end
+
+ def self.from_raw(content)
+ content
+ end
+end
diff --git a/lib/puppet/indirector/file_content/rest.rb b/lib/puppet/indirector/file_content/rest.rb
index 7b3cade..6da9103 100644
--- a/lib/puppet/indirector/file_content/rest.rb
+++ b/lib/puppet/indirector/file_content/rest.rb
@@ -2,10 +2,18 @@
# Created by Luke Kanies on 2007-10-18.
# Copyright (c) 2007. All rights reserved.

-require 'puppet/file_serving/content'
+require 'puppet/file_serving/content_stream'
require 'puppet/indirector/file_content'
require 'puppet/indirector/rest'
+require 'puppet/network/deferred_response'

class Puppet::Indirector::FileContent::Rest < Puppet::Indirector::REST
desc "Retrieve file contents via a REST HTTP interface."
+
+ # let's cheat a little bit:
+ # instead of returning a "model" we're returning something pretending to be a model
+ # which instead will fire a deferred request when needed
+ def find(request)
+ return Puppet::FileServing::ContentStream.create(Puppet::Network::DeferredResponse.new(request, indirection2uri(request), headers, self))
+ end
end
diff --git a/lib/puppet/network/deferred_response.rb b/lib/puppet/network/deferred_response.rb
new file mode 100644
index 0000000..9175e72
--- /dev/null
+++ b/lib/puppet/network/deferred_response.rb
@@ -0,0 +1,94 @@
+require 'net/http'
+require 'thread'
+
+class Puppet::Network::DeferredResponse
+
+ attr_accessor :request, :uri, :headers, :rest
+ attr_accessor :response
+ attr_accessor :request_started
+
+ def initialize(request, uri, headers, rest)
+ @request = request
+ @uri = uri
+ @headers = headers
+ @rest = rest
+ @mutex = Mutex.new
+ @chunk_queue = SizedQueue.new(1)
+ @got_response = ConditionVariable.new
+ @chunk = nil
+ @request_started = false


+ end
+
+ def stream?

+ true


+ end
+
+ def length

+ start_request
+ response.content_length
+ end
+
+ def start_request
+ # bail out early if we already started the request
+ return if request_started?
+
+ # launch the network request
+ Thread.new do
+ rest.network(request).request_get(uri, headers) do |response|
+ # we got a response from server
+ @mutex.synchronize do
+ @request_started = true
+ @response = response
+ @got_response.signal
+ end
+
+ unless content = rest.deserialize(response)
+ fail "Could not find any content at %s" % request
+ end
+
+ stream_response(content.content)


+ end
+ end
+

+ # wait for the start of the response to be available
+ @mutex.synchronize do
+ @got_response.wait(@mutex) unless @request_started


+ end
+ end
+
+ def stream

+ start_request
+ while true do
+ unless chunk = @chunk_queue.pop
+ # it's the end
+ break
+ end
+
+ @checksum.update(chunk) if @checksum
+
+ yield chunk
+ end
+ return @checksum
+ end
+
+ # if we want the stream to be checksummed on the fly
+ # either set a Digest or a Puppet::Util::ChecksumStream
+ def checksum=(checksum)
+ @checksum = checksum
+ end
+
+ def request_started?
+ @mutex.synchronize do
+ return @request_started


+ end
+ end
+

+ def stream_response(stream)
+ stream.response.read_body do |chunk|
+ # send chunks to our consumer
+ @chunk_queue << chunk
+ end
+ # it's over guys
+ @chunk_queue << nil


+ end
+end
\ No newline at end of file

diff --git a/lib/puppet/network/formats.rb b/lib/puppet/network/formats.rb
index 8b9b68d..19053ec 100644
--- a/lib/puppet/network/formats.rb
+++ b/lib/puppet/network/formats.rb
@@ -143,6 +143,10 @@ Puppet::Network::FormatHandler.create(:raw, :mime => "application/x-raw", :weigh
raise NotImplementedError
end

+ def support_stream?


+ true
+ end
+

# LAK:NOTE The format system isn't currently flexible enough to handle
# what I need to support raw formats just for individual instances (rather
# than both individual and collections), but we don't yet have enough data
diff --git a/lib/puppet/type/file.rb b/lib/puppet/type/file.rb
index 2f5b5df..3d3b8b6 100644
--- a/lib/puppet/type/file.rb
+++ b/lib/puppet/type/file.rb
@@ -710,7 +710,11 @@ module Puppet
if validate = validate_checksum?
# Use the appropriate checksum type -- md5, md5lite, etc.
sumtype = property(:checksum).checktype
- checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content)
+ if content.is_a?(String)
+ checksum ||= "{#{sumtype}}" + property(:checksum).send(sumtype, content)
+ else
+ content.checksum = property(:checksum).send("#{sumtype}_stream")
+ end
end

remove_existing(:file)
@@ -729,7 +733,18 @@ module Puppet
umask = mode ? 000 : 022

Puppet::Util.withumask(umask) do
- File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) { |f| f.print content }
+ File.open(path, File::CREAT|File::WRONLY|File::TRUNC, mode) do |f|
+ if content.is_a?(String)
+ f.print content
+ else # we're in front of a stream
+ newchecksum = content.stream do |c|
+ f.print c
+ end
+ if validate and newchecksum
+ checksum ||= "{#{sumtype}}" + newchecksum.checksum


+ end
+ end
+ end

end

# And put our new file in place
diff --git a/spec/integration/network/deferred_response.rb b/spec/integration/network/deferred_response.rb
new file mode 100644
index 0000000..01cb6ba
--- /dev/null
+++ b/spec/integration/network/deferred_response.rb
@@ -0,0 +1,34 @@


+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+

+require 'puppet/network/deferred_response'
+
+describe Puppet::Network::DeferredResponse do
+ before(:each) do
+ @request = stub_everything 'request'
+ @uri = stub_everything 'uri'
+ @headers = stub_everything 'headers'
+
+ @request = stub_everything 'request'
+ @http = stub_everything 'http'
+ @rest = stub_everything 'rest', :network => @http
+
+ @response = stub_everything 'response'
+ @http.stubs(:request_get).yields(@response)
+
+ @stream = stub_everything 'stream', :response => @response
+ @response.stubs(:read_body).multiple_yields("chunk1", "chunk2")
+ @content = stub_everything 'content', :content => @stream
+
+ @rest.stubs(:deserialize).returns(@content)
+
+ @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest)
+ end
+
+ it "should pass chunks to the main thread" do
+ @deferred.stream do |c|
+ c.should match /^chunk\d/


+ end
+ end
+end
\ No newline at end of file

diff --git a/spec/unit/file_serving/content_stream.rb b/spec/unit/file_serving/content_stream.rb
new file mode 100644
index 0000000..b0d1874
--- /dev/null
+++ b/spec/unit/file_serving/content_stream.rb
@@ -0,0 +1,29 @@


+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+

+require 'puppet/file_serving/content_stream'
+
+describe Puppet::FileServing::ContentStream do
+ it "should should be a subclass of Content" do
+ Puppet::FileServing::ContentStream.superclass.should equal(Puppet::FileServing::Content)
+ end
+
+ it "should be able to create a content instance" do
+ Puppet::FileServing::ContentStream.should respond_to(:create)
+ end
+
+ it "should return the content itself when converting from raw" do
+ content = stub 'content'
+ Puppet::FileServing::ContentStream.from_raw(content).should == content
+ end
+
+ it "should create an instance with a fake file name and correct content when converting from raw" do


+ instance = mock 'instance'

+ Puppet::FileServing::ContentStream.expects(:new).with("/this/is/a/fake/path").returns instance
+
+ instance.expects(:content=).with "foo/bar"
+
+ Puppet::FileServing::ContentStream.create("foo/bar").should equal(instance)
+ end
+end
diff --git a/spec/unit/indirector/file_content/rest.rb b/spec/unit/indirector/file_content/rest.rb
index afb674e..4fa0d20 100755
--- a/spec/unit/indirector/file_content/rest.rb
+++ b/spec/unit/indirector/file_content/rest.rb
@@ -2,10 +2,32 @@

require File.dirname(__FILE__) + '/../../../spec_helper'

-require 'puppet/indirector/file_content'
+require 'puppet/indirector/file_content/rest'
+require 'puppet/file_serving/content'
+require 'puppet/file_serving/content_stream'

-describe "Puppet::Indirector::Content::Rest" do
- it "should add the node's cert name to the arguments"
+describe "Puppet::Indirector::FileContent::Rest" do
+ it "should be a sublcass of Puppet::Indirector::REST" do
+ Puppet::Indirector::FileContent::Rest.superclass.should equal(Puppet::Indirector::REST)
+ end

- it "should set the content type to text/plain"
+ describe "when finding" do
+ before(:each) do
+ @request = stub_everything 'request'
+ Puppet::FileServing::Content.terminus_class = :rest
+ @indirector = Puppet::Indirector::FileContent::Rest.new
+ @indirector.stubs(:indirection2uri).with(@request).returns("/here")
+ Puppet::Network::DeferredResponse.stubs(:new)
+ end
+
+ it "should return a Puppet::FileServing::ContentStream as model" do
+ @indirector.find(@request).should be_instance_of(Puppet::FileServing::ContentStream)
+ end
+
+ it "should return a content stream wrapping a deferred response" do
+ content = stub_everything 'content'
+ Puppet::Network::DeferredResponse.expects(:new).returns content
+ @indirector.find(@request).content.should == content


+ end
+ end
end

diff --git a/spec/unit/network/deferred_response.rb b/spec/unit/network/deferred_response.rb
new file mode 100644
index 0000000..858229e
--- /dev/null
+++ b/spec/unit/network/deferred_response.rb
@@ -0,0 +1,182 @@


+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+

+require 'puppet/network/deferred_response'
+
+describe Puppet::Network::DeferredResponse do
+ before(:each) do
+ @request = stub_everything 'request'
+ @uri = stub_everything 'uri'
+ @headers = stub_everything 'headers'
+
+ @request = stub_everything 'request'
+ @http = stub_everything 'http', :request_get => @request
+ @rest = stub_everything 'rest', :network => @http
+
+ @chunk_queue = stub_everything 'chunk_queue'
+ SizedQueue.expects(:new).returns(@chunk_queue)
+
+ @got_response = stub_everything 'got_response'
+ ConditionVariable.stubs(:new).returns(@got_response)
+
+ @mutex = stub_everything 'mutex'
+ @mutex.stubs(:synchronize).yields
+ Mutex.expects(:new).returns(@mutex)
+
+ @deferred = Puppet::Network::DeferredResponse.new(@request, @uri, @headers, @rest)
+ end
+
+ it "should be a 'stream'" do
+ @deferred.should be_stream
+ end
+
+ it "should be possible to set a checksum" do
+ @deferred.should be_respond_to(:checksum=)
+ end
+
+ describe "when getting length" do
+ before(:each) do
+ @response = stub_everything 'response'
+ @deferred.response = @response
+ @deferred.stubs(:start_request)
+ end
+
+ it "should start the network request" do
+ @deferred.expects(:start_request)
+ @deferred.length
+ end
+
+ it "should return the response content length" do
+ @response.expects(:content_length).returns 10
+ @deferred.length.should == 10


+ end
+ end
+
+ describe "when streaming" do

+ before(:each) do
+ @deferred.stubs(:start_request)
+ end
+
+ it "should start the network request" do
+ @deferred.expects(:start_request)
+ @deferred.stream
+ end
+
+ it "should fetch one chunk from the chunk queue" do
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+ @deferred.stream { |c| }
+ end
+
+ it "should fetch all chunks from the chunk queue until nil" do
+ @chunk_queue.expects(:pop).times(3).returns("chunk1", "chunk2", nil)
+ @deferred.stream { |c|
+ c.should match /^chunk\d/
+ }
+ end
+
+ it "should give the chunk to the current checksum stream" do
+ checksum = stub_everything 'checksum'
+ @deferred.checksum = checksum
+
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+ checksum.expects(:update).with("chunk")
+
+ @deferred.stream { |c| }
+ end
+
+ it "should yield the chunks to the given block" do
+ @chunk_queue.expects(:pop).twice.returns("chunk", nil)
+
+ @deferred.stream { |c|
+ c.should == "chunk"
+ }
+ end
+
+ it "should return the checksum" do
+ checksum = stub_everything 'checksum'
+ @deferred.checksum = checksum
+
+ @deferred.stream.should == checksum


+ end
+ end
+

+ describe "when issueing the network request" do
+ before(:each) do
+ Thread.stubs(:new).yields(nil)
+ @stream = stub_everything 'stream'
+ @content = stub_everything 'content', :content => @stream
+ @response = stub_everything 'response'
+ @content.stubs(:response).returns(@response)
+ end
+
+ it "should return early if the request has already been started" do
+ @deferred.expects(:request_started?).returns(true)
+ @rest.expects(:network).never
+ @deferred.start_request
+ end
+
+ it "should launch the request in a new Thread" do
+ Thread.expects(:new).yields
+ @rest.expects(:network).with(@request).returns(@http)
+ @http.expects(:request_get).with(@uri, @headers)
+ @deferred.start_request
+ end
+
+ describe "and the network thread" do
+ before(:each) do
+ Thread.expects(:new).yields
+ @rest.stubs(:network).with(@request).returns(@http)
+ @rest.stubs(:deserialize).returns(@content)
+ @http.stubs(:request_get).with(@uri, @headers).yields(@response)
+ @deferred.stubs(:stream_response)
+ end
+
+ it "should let everyone know the request has started" do
+ @deferred.start_request
+ @deferred.request_started?.should be true
+ end
+
+ it "should signal other threads that the response is ready" do
+ @got_response.expects(:signal)
+ @deferred.start_request
+ end
+
+ it "should store the current response" do
+ @deferred.start_request
+ @deferred.response.should == @response
+ end
+
+ it "should deserialize the response" do
+ @rest.expects(:deserialize).with(@response).returns(@content)
+ @deferred.start_request
+ end
+
+ it "should stream the response content" do
+ @deferred.expects(:stream_response).with(@stream)
+ @deferred.start_request


+ end
+ end
+

+ describe "and the other thread" do
+ it "should finally wait the request to be started" do
+ @got_response.expects(:wait)
+ @deferred.start_request


+ end
+ end
+

+ describe "when streaming the response content" do
+ it "should put each chunk in the chunk queue" do
+ @response.expects(:read_body).multiple_yields("chunk1","chunk2")
+ @chunk_queue.expects(:<<).with("chunk1").then.with("chunk2")
+ @deferred.stream_response(@content)
+ end
+
+ it "should enqueue nil at the end" do
+ @chunk_queue.expects(:<<).with(nil)
+ @deferred.stream_response(@content)


+ end
+ end
+ end
+

+end
\ No newline at end of file

diff --git a/spec/unit/network/formats.rb b/spec/unit/network/formats.rb
index 208c6f4..c1f74f9 100755
--- a/spec/unit/network/formats.rb
+++ b/spec/unit/network/formats.rb
@@ -280,6 +280,10 @@ describe "Puppet Network Format" do
@format.should be_supported(String)
end

+ it "should always support streaming" do
+ @format.should be_support_stream
+ end
+
it "should fail if its multiple_render method is used" do
lambda { @format.render_multiple("foo") }.should raise_error(NotImplementedError)
end
diff --git a/spec/unit/type/file.rb b/spec/unit/type/file.rb
index 1b3fe6a..49c1d34 100755
--- a/spec/unit/type/file.rb
+++ b/spec/unit/type/file.rb
@@ -774,5 +774,37 @@ describe Puppet::Type.type(:file) do

lambda { file.write("something", :content) }.should raise_error(Puppet::Error)
end
+
+ it "should stream streamable content" do
+ file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet")
+ f = stub_everything 'file'
+ File.stubs(:open).yields(f)
+ File.stubs(:rename)
+ content = stub_everything 'content'
+
+ content.expects(:stream).yields("chunk")
+ f.expects(:print).with("chunk")
+
+ file.write(content, :content)
+ end
+
+ it "should use the streamed checksum" do
+ file = Puppet::Type::File.new(:name => "/my/file", :backup => "puppet")
+ f = stub_everything 'file'
+ File.stubs(:open).yields(f)
+ File.stubs(:rename)
+ file.stubs(:validate_checksum?).returns(true)
+ file.stubs(:fail_if_checksum_is_wrong)
+ content = stub_everything 'content'
+ property = stub_everything 'property'
+ file.stubs(:property).returns property
+ property.stubs(:checktype).returns(:md5)
+ checksum = stub_everything 'checksum', :checksum => "DEADBEEF"
+ content.stubs(:stream).returns(checksum)
+
+ file.expects(:setchecksum).with("{md5}DEADBEEF")
+
+ file.write(content, :content)

Nigel Kersten

unread,
Feb 8, 2010, 12:48:48 PM2/8/10
to puppe...@googlegroups.com
:-O

indeed!

 

--
You received this message because you are subscribed to the Google Groups "Puppet Developers" group.
To post to this group, send email to puppe...@googlegroups.com.
To unsubscribe from this group, send email to puppet-dev+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.




--
nigel

Markus Roberts

unread,
Feb 9, 2010, 1:32:32 PM2/9/10
to puppet-dev
I am confused about a number of things with this patch; I'll pick out
one that seems the easiest to explain in the hopes that the answer
will allign my thinking:

I don't see how the following could work:

def find(request)
- return nil unless result =
deserialize(network(request).get(indirection2uri(request), headers))
+ return nil unless result =
network(request).request_get(indirection2uri(request), headers) do
|response|
+ return deserialize(response)
+ end
result.name = request.key
result
end

Specifically, if the block containing "return deserialize(response)"
is called/yielded "live" I believe it should exit from the find method
without the result name being set to the result key, whereas if it is
stashed and called later (after find has returned) it should produce a
LocalJumpError.

If this code works, I'm obviously missing something. Any idea what?

-- Markus

Markus Roberts

unread,
Feb 9, 2010, 1:46:01 PM2/9/10
to puppet-dev

More explicitly, consider this code:

def use_block
   yield
end

def foo(x)
   use_block { return x*x }
   2*x
end

def store_block(&block)
   $stored_block = block
end

def bar(x)
   store_block { return x*x }
   2*x
end
   
puts foo(7)
puts bar(11)
puts $stored_block.call

It produces the following output:

49
22
lambda-do.rb:15:in `bar': unexpected return (LocalJumpError)
    from lambda-do.rb:21:in `call'
    from lambda-do.rb:21

The foo(7) (use immediately) case returns the result from the block (x*x, or 49) and never gets to the last bit (the 2*x); the bar(11) case returns the 2*x but errors out when you try to call the stored block.

-- Markus

Brice Figureau

unread,
Feb 9, 2010, 1:47:17 PM2/9/10
to puppe...@googlegroups.com
On 09/02/10 19:32, Markus Roberts wrote:
> I am confused about a number of things with this patch; I'll pick out
> one that seems the easiest to explain in the hopes that the answer
> will allign my thinking:
>
> I don't see how the following could work:
>
> def find(request)
> - return nil unless result =
> deserialize(network(request).get(indirection2uri(request), headers))
> + return nil unless result =
> network(request).request_get(indirection2uri(request), headers) do
> |response|
> + return deserialize(response)
> + end
> result.name = request.key
> result
> end

What I understood is that returning a value in this block would end up
in "result". And indeed I was able to deserialize catalogs, certificates
and such, so I suppose it is working fine.

> Specifically, if the block containing "return deserialize(response)"
> is called/yielded "live" I believe it should exit from the find method
> without the result name being set to the result key, whereas if it is
> stashed and called later (after find has returned) it should produce a
> LocalJumpError.

To me it would return from the method that yields it (ie the
request_get), but that's a wild guess.

> If this code works, I'm obviously missing something. Any idea what?

Your ruby is way better than mine, so I'd tend to trust what you say :-)

So you would write the code like this (sorry for the wrapping)?
network(request).request_get(indirection2uri(request), headers) do
|response|
result = deserialize(response)
end
return nil unless result
result.name = request.key
result

If that's preferrable, this is an easy change.
--
Brice Figureau
My Blog: http://www.masterzen.fr/

Brice Figureau

unread,
Feb 9, 2010, 1:49:26 PM2/9/10
to puppe...@googlegroups.com

So my code is incorrect because it would strip the end of the
request_get which certainly closes/finishes network things.

Markus Roberts

unread,
Feb 9, 2010, 2:18:37 PM2/9/10
to puppet-dev
>> If this code works, I'm obviously missing something.  Any idea what?

Your ruby is way better than mine, so I'd tend to trust what you say :-)

I don't always trust what I say, so no reason you should.

If the code overall appears to produce the correct results I suspect that it isn't doing exactly as either of us expect; is it possible that it's not getting to that code, or could some threading issues be confusing the situation?

-- Markus

Brice Figureau

unread,
Feb 9, 2010, 4:22:43 PM2/9/10
to puppe...@googlegroups.com

Obviously the response gets deserialized, so I'd say it works.
It's just that, based on your sample, it seem I'm short-circuiting the
Net::HTTP "request shutdown", which is certainly bad :-)

I'll fix this soon in my repository,

Brice Figureau

unread,
Feb 9, 2010, 4:59:18 PM2/9/10
to puppe...@googlegroups.com
On 09/02/10 19:32, Markus Roberts wrote:
> I am confused about a number of things with this patch; I'll pick out
> one that seems the easiest to explain in the hopes that the answer
> will allign my thinking:
>
> I don't see how the following could work:
>
> def find(request)
> - return nil unless result =
> deserialize(network(request).get(indirection2uri(request), headers))
> + return nil unless result =
> network(request).request_get(indirection2uri(request), headers) do
> |response|
> + return deserialize(response)
> + end
> result.name = request.key
> result
> end

I rewrote this part as I sent earlier and indeed this still works,
except it wasn't doing the result.name= stuff.
It allowed me to discover that this result.name= was failing hard when
trying to fetch file metadata (those have no "name"). This is also fixed
in the new patch revision.

Luke Kanies

unread,
Mar 15, 2010, 6:40:09 PM3/15/10
to puppe...@googlegroups.com

[...]

Hi Brice (and Markus, I guess),

What's the status of this patch series? I'd love to get this into
rowlf if at all possible, and I'll work with you to get it done if you
need some backup on it.

--
I respect faith, but doubt is what gets you an education.
-- Wilson Mizner
---------------------------------------------------------------------
Luke Kanies -|- http://reductivelabs.com -|- +1(615)594-8199

Markus Roberts

unread,
Mar 15, 2010, 7:55:53 PM3/15/10
to puppet-dev

We're intending to include it (or some variation there of) if at all possible.  Rein and I chatted about it over the weekend and he bookmarked interest in proceeding after he gets through his dashboard TTD list (probably tomorrow or Wednesday according to his status right after lunch today).


Reply all
Reply to author
Forward
0 new messages