Race Condition in Ruby GServer

I've been playing around with Ruby lately, trying to write a UDP server. Of course, the way to write servers in Ruby is apparently (I'm still a bit new at this Ruby malarky) to subclass GServer.

And what a fine piece of code GServer is! Really does the job and shows just how easy it is to do stuff in Ruby. If you want to a TCP server.

But I needed a UDP server. So after much messing about I came up with the following half-baked solution. Please feel free to tear to pieces. I should remind you however that this code is WOMM certified.

# with apologies to John W. Small

require "socket"
require "thread"

class UServer

  DEFAULT_HOST = "127.0.0.1"
  DEFAULT_TRANSPORT = "TCP";

  def serve(io,content)
  end

  @@services = {}   # Hash of opened ports, i.e. services
  @@servicesMutex = Mutex.new

  def UServer.stop(port, 
                   host = DEFAULT_HOST, 
                   trans = DEFAULT_TRANSPORT)
    @@servicesMutex.synchronize {
      @@services[host][port][trans].stop
    }
  end

  def UServer.in_service?(port, 
                          host = DEFAULT_HOST, 
                          trans = DEFAULT_TRANSPORT)
    @@services.has_key?(host) and 
      @@services[host].has_key?(port) and
      @@services[host][port].has_key?(trans)
  end

  def stop
    @connectionsMutex.synchronize  {
      if @serverThread
        @serverThread.raise "stop"
      end
    }
  end

  def stopped?
    nil == @serverThread
  end

  def shutdown
    @shutdown = true
  end

  def connections
    @connections.size
  end

  def join
    @serverThread.join if @serverThread
  end

  attr_reader :port, :host, :trans, :maxConnections
  attr_accessor :stdlog, :audit, :debug

  def connecting(client)
    addr = client.peeraddr
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} client:#{addr[1]} " +
        "#{addr[2]}<#{addr[3]}> connect")
    true
  end

  def disconnecting(clientPort)
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans}" +
      "client:#{clientPort} disconnect")
  end

  protected :connecting, :disconnecting

  def starting()
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} start")
  end

  def stopping()
    log("#{self.class.to_s} #{@host}:#{@port} #{@trans} stop")
  end

  protected :starting, :stopping


  def error(detail)
    log(detail.backtrace.join("n"))
  end

  def log(msg)
    if @stdlog
      @stdlog.puts("[#{Time.new.ctime}] %s" % msg)
      @stdlog.flush
    end
  end

  protected :error, :log

  def initialize(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT, maxConnections = 4,
    stdlog = $stderr, audit = false, debug = false)
    @serverThread = nil
    @port = port
    @host = host
    @trans = trans
    @maxConnections = maxConnections
    @connections = []
    @connectionsMutex = Mutex.new
    @connectionsCV = ConditionVariable.new
    @stdlog = stdlog
    @audit = audit
    @debug = debug
  end


  def start(maxConnections = -1)
    raise "running" if !stopped?
    @shutdown = false
    @maxConnections = maxConnections if maxConnections > 0
    @@servicesMutex.synchronize  {
      if UServer.in_service?(@port,@host,@trans)
        raise "Port already in use: #{host}:#{@port} #{@trans}!"
      end

      if "TCP" == @trans
        @server = ContentTCPServer.new(@host,@port)
      else
        @server = ContentUDPServer.new(@host,@port)
      end

      @@services[@host] = {} unless @@services.has_key?(@host)
      @@services[@host][@port] = {} unless @@services[@host].has_key?(@port)
      @@services[@host][@port][@trans] = self;
    }
    @serverThread = Thread.new {
      begin
        starting if @audit
        while !@shutdown
          @connectionsMutex.synchronize  {
          puts "start @con.size=#{@connections.size}"
             while @connections.size >= @maxConnections
               @connectionsCV.wait(@connectionsMutex)
             end
          }

          client, port, close, content = @server.accept

          Thread.new(client,port,close,content)  { 
            |myClient, myPort, myClose, myContent|

            @connectionsMutex.synchronize {
              @connections << Thread.current
            }
            begin
              serve(myClient,myContent) if !@audit or connecting(myClient)
              puts "finished serve"
            rescue => detail
              error(detail) if @debug
            ensure
              begin
                if myClose
                  myClient.close
              end
              rescue
              end

              @connectionsMutex.synchronize {
                @connections.delete(Thread.current)
                @connectionsCV.signal
              }

              disconnecting(myPort) if @audit
            end
          }
        end
      rescue => detail
        error(detail) if @debug
      ensure
        begin
          @server.close
        rescue
        end
        if @shutdown
          @connectionsMutex.synchronize  {
             while @connections.size > 0
               @connectionsCV.wait(@connectionsMutex)
             end
          }
        else
          @connections.each { |c| c.raise "stop" }
        end
        @serverThread = nil
        @@servicesMutex.synchronize  {
          @@services[@host][@port].delete(@trans)
        }
        stopping if @audit
      end
    }
    self
  end

end


class ContentTCPServer

  def initialize( host, port )
    @server = TCPServer.new(host,port)
  end
  
  def accept
    client = @server.accept
    return [client,client.peeraddr[1],true,client.gets(nil)]
  end  

  def close 
    @server.close
  end

end


class ContentUDPServer

  def initialize( host, port )
    puts "init"

    @socket = UDPSocket.new
    puts "new s: #{@socket} on #{host}:#{port}"

    @socket.bind(host, port)
    puts "bound: #{@socket}"
  end
  
  def accept
    puts "accept"

    packet = @socket.recvfrom(1024)
    return [@socket, 0, false, packet[0]]
  end  

  def close 
    @socket.close
  end

end

Lovely! Who says Java can't be translated into Ruby? I even used duck typing!

Anyway, see if you can spot the significant difference in thread handling between this and the original GServer.

Well, OK, here it is:

GServer says:

@connections << Thread.new(client)  { |myClient|
  ...
}

I say:

Thread.new(client,port,close,content)  { |myClient, myPort, myClose, myContent|
  @connectionsMutex.synchronize {
    @connections << Thread.current
  }
  ...
}

What happened was that UDP packets were handled so quickly that the thread was (mostly) never placed in the @connections list until after it was (supposedly) removed from the list by

@connectionsMutex.synchronize {
  @connections.delete(Thread.current)
  @connectionsCV.signal
}

at the end of the request thread. Seems like a nasty race condition to me. My code just makes sure that the thread is placed into the @connections list before nasty stuff can happen.

Am I right? You tell me…




This entry was posted in General. Bookmark the permalink.

Leave a Reply

Your email address will not be published.