Pastie now auto-senses if line-wrap is a bad or good idea. Feedback?
## mark a section (Learn more)
require 'thread' module Pic class Receiver def initialize(*args, &block) #:nodoc: @waiters = [] if args.empty? if block yield self else @waiters << [[], nil] end else @waiters << [args, block] end end def match(*args, &block) @waiters << [args, block] nil end alias :when :match def process(message) #:nodoc: if matched = @waiters.find { |against, block| match?(message, against) } block = matched.last [block ? block.call(*message) : message] end end private def match?(message, against) against.each_with_index do |value, i| return false unless value === message[i] end end end class Proxy def initialize(process) @process = process end def send(*args) puts "Sending to #{self}: #{args.inspect[0..200]}" if $DEBUG @process.send :__queue, args end def to_s @process.to_s end def ==(other) return false unless other other.instance_variable_get(:@process) == @process end end class Process < Thread GROUP = ThreadGroup.new class << self def list() @processes.values end def start(*args, &callable) unless callable callable = args.shift raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call) end super do |*args| process = Thread.current GROUP.add process Thread.exclusive do pid = @last_pid += 1 process.instance_variable_set(:@pid, pid) @processes[pid] = process end begin process.instance_variable_set(:@queue, Queue.new) callable[*args] while tail = process.instance_variable_get(:@tail) process.instance_variable_set(:@tail, nil) tail.first[*tail.last] end rescue => ex if $DEBUG $stderr.puts "Error in process #{self}:" $stderr.puts "#{self}: #{ex.message}" $stderr.puts "#{self}: #{ex.backtrace[0..20]}" end ensure Thread.exclusive { @processes.delete(process.pid) } end end end end @processes = {} @last_pid = 0 def pid @pid end def receive(*args, &block) raise RuntimeError, "Can only call on current thread" unless Thread.current == self receiver = Pic::Receiver.new(*args, &block) puts "#{self}: in receive" if $DEBUG skipped = nil queue = @queue begin loop do message = queue.deq result = receiver.process(message) return result.first if result skipped ||= [] skipped << message end ensure skipped.each { |msg| queue.enq msg } if skipped end end def tail(*args, &callable) raise RuntimeError, "Can only call on current thread" unless Thread.current == self unless callable callable = args.shift raise ArgumentError, "Must provide block or callable first argument" unless callable.respond_to?(:call) end @tail = [callable, args] end def proxy Pic::Proxy.new(self) end private def __queue(message) @queue.enq message end END { loop do thread = Thread.exclusive { @processes.find { true } } break unless thread thread.last.join end } end end module Kernel private def spawn(*args, &block) Pic::Proxy.new(Pic::Process.start(*args, &block)) end def receive(*args, &block) Thread.current.receive(*args, &block) end def current() Thread.current.proxy end def tail(*args, &block) Thread.current.tail(*args, &block) end end
This paste will be private.
From the Design Piracy series on my blog: