I've been playing around with Ruby lately, trying to write a UDP server. Of course, the way to write servers in Ruby is apparently (I'm still a bit new at this Ruby malarky) to subclass GServer.
And what a fine piece of code GServer is! Really does the job and shows just how easy it is to do stuff in Ruby. If you want to a TCP server.
But I needed a UDP server. So after much messing about I came up with the following half-baked solution. Please feel free to tear to pieces. I should remind you however that this code is WOMM certified.
# with apologies to John W. Small
require "socket"
require "thread"
class UServer
DEFAULT_HOST = "127.0.0.1"
DEFAULT_TRANSPORT = "TCP";
def serve(io,content)
end
@@services = {} # Hash of opened ports, i.e. services
@@servicesMutex = Mutex.new
def UServer.stop(port,
host = DEFAULT_HOST,
trans = DEFAULT_TRANSPORT)
@@servicesMutex.synchronize {
@@services[host][port][trans].stop
}
end
def UServer.in_service?(port,
host = DEFAULT_HOST,
trans = DEFAULT_TRANSPORT)
@@services.has_key?(host) and
@@services[host].has_key?(port) and
@@services[host][port].has_key?(trans)
end
def stop
@connectionsMutex.synchronize {
if @serverThread
@serverThread.raise "stop"
end
}
end
def stopped?
nil == @serverThread
end
def shutdown
@shutdown = true
end
def connections
@connections.size
end
def join
@serverThread.join if @serverThread
end
attr_reader :port, :host, :trans, :maxConnections
attr_accessor :stdlog, :audit, :debug
def connecting(client)
addr = client.peeraddr
log("#{self.class.to_s} #{@host}:#{@port} #{@trans} client:#{addr[1]} " +
"#{addr[2]}<#{addr[3]}> connect")
true
end
def disconnecting(clientPort)
log("#{self.class.to_s} #{@host}:#{@port} #{@trans}" +
"client:#{clientPort} disconnect")
end
protected :connecting, :disconnecting
def starting()
log("#{self.class.to_s} #{@host}:#{@port} #{@trans} start")
end
def stopping()
log("#{self.class.to_s} #{@host}:#{@port} #{@trans} stop")
end
protected :starting, :stopping
def error(detail)
log(detail.backtrace.join("n"))
end
def log(msg)
if @stdlog
@stdlog.puts("[#{Time.new.ctime}] %s" % msg)
@stdlog.flush
end
end
protected :error, :log
def initialize(port, host = DEFAULT_HOST, trans = DEFAULT_TRANSPORT, maxConnections = 4,
stdlog = $stderr, audit = false, debug = false)
@serverThread = nil
@port = port
@host = host
@trans = trans
@maxConnections = maxConnections
@connections = []
@connectionsMutex = Mutex.new
@connectionsCV = ConditionVariable.new
@stdlog = stdlog
@audit = audit
@debug = debug
end
def start(maxConnections = -1)
raise "running" if !stopped?
@shutdown = false
@maxConnections = maxConnections if maxConnections > 0
@@servicesMutex.synchronize {
if UServer.in_service?(@port,@host,@trans)
raise "Port already in use: #{host}:#{@port} #{@trans}!"
end
if "TCP" == @trans
@server = ContentTCPServer.new(@host,@port)
else
@server = ContentUDPServer.new(@host,@port)
end
@@services[@host] = {} unless @@services.has_key?(@host)
@@services[@host][@port] = {} unless @@services[@host].has_key?(@port)
@@services[@host][@port][@trans] = self;
}
@serverThread = Thread.new {
begin
starting if @audit
while !@shutdown
@connectionsMutex.synchronize {
puts "start @con.size=#{@connections.size}"
while @connections.size >= @maxConnections
@connectionsCV.wait(@connectionsMutex)
end
}
client, port, close, content = @server.accept
Thread.new(client,port,close,content) {
|myClient, myPort, myClose, myContent|
@connectionsMutex.synchronize {
@connections << Thread.current
}
begin
serve(myClient,myContent) if !@audit or connecting(myClient)
puts "finished serve"
rescue => detail
error(detail) if @debug
ensure
begin
if myClose
myClient.close
end
rescue
end
@connectionsMutex.synchronize {
@connections.delete(Thread.current)
@connectionsCV.signal
}
disconnecting(myPort) if @audit
end
}
end
rescue => detail
error(detail) if @debug
ensure
begin
@server.close
rescue
end
if @shutdown
@connectionsMutex.synchronize {
while @connections.size > 0
@connectionsCV.wait(@connectionsMutex)
end
}
else
@connections.each { |c| c.raise "stop" }
end
@serverThread = nil
@@servicesMutex.synchronize {
@@services[@host][@port].delete(@trans)
}
stopping if @audit
end
}
self
end
end
class ContentTCPServer
def initialize( host, port )
@server = TCPServer.new(host,port)
end
def accept
client = @server.accept
return [client,client.peeraddr[1],true,client.gets(nil)]
end
def close
@server.close
end
end
class ContentUDPServer
def initialize( host, port )
puts "init"
@socket = UDPSocket.new
puts "new s: #{@socket} on #{host}:#{port}"
@socket.bind(host, port)
puts "bound: #{@socket}"
end
def accept
puts "accept"
packet = @socket.recvfrom(1024)
return [@socket, 0, false, packet[0]]
end
def close
@socket.close
end
end
Lovely! Who says Java can't be translated into Ruby? I even used duck typing!
Anyway, see if you can spot the significant difference in thread handling between this and the original GServer.
Well, OK, here it is:
GServer says:
@connections << Thread.new(client) { |myClient|
...
}
I say:
Thread.new(client,port,close,content) { |myClient, myPort, myClose, myContent|
@connectionsMutex.synchronize {
@connections << Thread.current
}
...
}
What happened was that UDP packets were handled so quickly that the thread was (mostly) never placed in the @connections list until after it was (supposedly) removed from the list by
@connectionsMutex.synchronize {
@connections.delete(Thread.current)
@connectionsCV.signal
}
at the end of the request thread. Seems like a nasty race condition to me. My code just makes sure that the thread is placed into the @connections list before nasty stuff can happen.
Am I right? You tell me…


