Select Git revision

cleemy desu wayo authored
outvoke.rb 38.49 KiB
#!/usr/bin/env ruby
#
# ==========================================================================
# Outvoke -- version 0.0.99.20250725
#
# written by cleemy desu wayo / Licensed under CC0 1.0
#
# official repository: https://gitlab.com/cleemy-desu-wayo/outvoke
# ==========================================================================
#
# * requirements:
# * Ruby 3.0 or later
#
require 'time'
require 'yaml'
require 'json'
require 'bigdecimal'
require 'bigdecimal/util'
require 'bigdecimal/math'
class Outvoke
attr_accessor :version, :ds, :hooks, :hooks_first_time, :wait, :nodup_list,
:mutex_nodup, :init_time, :ds_thread_group, :hookcc_thread_group,
:default_output_mode
attr_reader :last_msgid
alias_method :sources, :ds # eventually "sources" will be removed
def initialize
@version = Struct.new(:branch, :body).new(
'0.1',
'0.0.99.20250725'
)
@ds = Hash.new
@hooks = []
@hooks_first_time = []
@wait = 0.5
@mutex_msgid = Mutex.new
@last_msgid = 0
@hookcnt = 0
@nodup_list = Hash.new
@mutex_nodup = Mutex.new
@init_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@ds_thread_group = ThreadGroup.new
@hookcc_thread_group = ThreadGroup.new
@default_output_mode = nil
@is_quiet_mode = false
end
def [](str)
@ds[str]
end
def quiet_mode?
@is_quiet_mode
end
def quiet_mode=(x)
@is_quiet_mode = x
end
def elapsed
Process.clock_gettime(Process::CLOCK_MONOTONIC) - @init_time
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
def nodup(time, cond)
unless block_given?
@mutex_nodup.synchronize do
register_nodup(time, cond)
end
return
end
is_nodup = false
@mutex_nodup.synchronize do
is_nodup = sweep_and_check_nodup(cond)
register_nodup(time, cond)
end
return unless is_nodup
yield time, cond
end
def nodup?(x)
@mutex_nodup.synchronize do
sweep_and_check_nodup(x)
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
def loputs(body = "", attachment = nil)
@ds['lo'] << { "body" => body.to_s.dup, "attachment" => attachment }
nil
end
def mainloop
loop do
enabled_ds_list = []
@ds.each do |ds_label, ds|
next unless ds.enabled?
enabled_ds_list << ds
ds.before_each_event
ds.each_event do |event|
hooks = @hooks
@hookcnt = 0
if ds.is_first_time_log_check
ds.first_time_log_check(ds, event)
hooks = @hooks_first_time
end
hooks.each do |hook|
hooks_exec(ds, hook, event)
end
end
ds.after_each_event
end
# when only data source "stdin" is enabled, terminate Outvoke itself if necessary
if enabled_ds_list.length == 1
enabled_ds_list[0]&.then do |ds|
if ds.is_a?(OutvokeDataSourceStdin)
if ds.eof_reached? && ds.log_lines.empty? && ds.stdin_new_lines.empty?
default_thread_list = ThreadGroup::Default.list - [Thread.current]
if default_thread_list.empty? && $outvoke.hookcc_thread_group.list.empty?
exit 0
end
end
end
end
end
sleep @wait
end
end
def register_proc(ds_cond, event_cond, hookby, &block)
hooks = @hooks
if hookby.end_with?("ft")
hooks = @hooks_first_time
end
extract_ds(ds_cond).each do |ds_label|
@ds[ds_label].enabled = true
hooks << {
'ds_label' => ds_label,
'event_cond' => event_cond,
'hookby' => hookby,
'proc' => block
}
end
end
# extracts only the labels of the appropriate data sources from $sources
def extract_ds(ds_cond)
return @ds.keys if ds_cond == true
ds_cond_array = [ds_cond]
if ds_cond.respond_to?(:each)
ds_cond_array = ds_cond
end
result = []
ds_cond_array.each do |label_cond|
next unless label_cond.respond_to?(:===)
# TODO (automatically generate a new data source)
if label_cond.is_a?(String)
label_cond.match(/\Aauxin:(fifo)?:([1-9]+[0-9]*)\z/)&.then do |m|
new_ds_label = "auxin:fifo:#{m[2]}"
unless @ds[new_ds_label]
@ds[new_ds_label] = OutvokeDataSourceAuxinFifo.new(self, new_ds_label)
end
end
end
@ds.keys.each do |label|
result << label if label_cond === label
end
end
result.uniq
end
def generate_new_msgid
new_msgid = nil
@mutex_msgid.synchronize do
@last_msgid += 1
new_msgid = @last_msgid
end
new_msgid
end
def read_env_settings
if ENV["OUTVOKE_QUIET_MODE"] == "1"
@is_quiet_mode = true
end
ENV["OUTVOKE_OUTPUT_MODE"]&.then do
@default_output_mode = _1
end
ENV["OUTVOKE_WAIT"]&.then do
@wait = _1.to_f
end
end
def read_env_vars
ENV.filter{ _1 =~ /\AOUTVOKE_VAR_[A-Z][_A-Z0-9]*\z/ }.each do |key, value|
var_name = key.sub(/\AOUTVOKE_VAR_/, "").downcase
next if ["stdin", "stdout", "stderr", "outvoke", "o", "ds"].include?(var_name)
instance_variable_set("@tmp", value)
eval "$#{var_name} = @tmp"
end
end
private
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
def register_nodup(time, obj)
expiration_time = Time.now + time
@nodup_list.each_key do |key|
@nodup_list[key] = expiration_time if key === obj
end
@nodup_list[obj] = expiration_time
end
def sweep_and_check_nodup(x)
is_nodup = true
tmp_now = Time.now
# sweep (remove expired items)
@nodup_list = @nodup_list.reject do |_, exp|
tmp_now > exp
end
# immediately after sweep, the list can be considered as a simple blocklist
@nodup_list.each_key do |key|
if key === x
is_nodup = false
break
end
end
is_nodup
end
def hooks_exec(ds, hook, event)
# check data source
return unless hook['ds_label'] == ds.label
event_cond_array = [hook['event_cond']]
if hook['event_cond'].respond_to?(:each)
event_cond_array = hook['event_cond']
end
event.is_quiet_mode = @is_quiet_mode # TODO (is_quiet_mode is deprecated)
event.status.quiet_mode = @is_quiet_mode
# check condition
event_cond_array.detect do |event_cond|
event.m = nil
if event_cond == true
event.m = [event.body]
elsif event_cond.is_a?(String)
event.m = [event.body] if event.body.start_with?(event_cond)
elsif event_cond.respond_to?(:match)
event.m = event_cond.match(event.body)
elsif event_cond.respond_to?(:call)
proc_result = event_cond.call(event)
if proc_result
if proc_result.respond_to?(:each)
event.m = proc_result.each
elsif proc_result.respond_to?(:to_a)
event.m = proc_result.to_a
else
event.m = [proc_result]
end
end
end
event.m
end
return unless event.m
event.m = event.m.to_a # with YAML.unsafe_load and to_yaml in mind
event.hookby = hook["hookby"]
@hookcnt += 1
event.hookcnt = @hookcnt
hook_result = nil
hook_result_for_post_process = nil
# pre process
ds.pre_procs.each do |proc|
proc.call(event)
end
# execute
if hook['proc']
tmp_context = ds.generate_context(event)
hook_result = tmp_context.instance_exec(event, &hook['proc'])
else
hook_result_for_post_process = event
end
if hook_result == true
hook_result_for_post_process = event
elsif hook_result.nil? || hook_result == false
# nop
else
hook_result_for_post_process = hook_result
end
# post process
ds.post_procs.each do |proc|
if proc.arity == 2
proc.call(event, hook_result_for_post_process)
elsif proc.arity == 3
proc.call(event, hook_result_for_post_process, event.hookby)
else
raise "ERROR: invalid Proc was found in post_procs (ds: \"#{ds.label}\", proc.arity: #{proc.arity})"
end
end
end
end
#
# this class provided only for instance_exec
#
class OutvokeProcExecContext
def initialize(outvoke, ds, event)
@outvoke = outvoke
@ds = ds
@event = event
@event_body = @event.body.dup
end
def to(ds_cond, attachment = nil)
@outvoke.extract_ds(ds_cond).map{ @outvoke.ds[_1] }.each do |ds|
ds << { "body" => @event_body.to_s.dup, "attachment" => attachment }
end
nil
end
def e = @event
def loputs(x = @event)
@outvoke["lo"] << { "body" => x.to_s.dup, "attachment" => nil }
nil
end
end
module OutvokeDSL
refine Kernel do
def hook(ds_cond, event_cond = true, &block)
$outvoke.register_proc(ds_cond, event_cond, "hook", &block)
end
def hookft(ds_cond, event_cond = true, &block)
$outvoke.register_proc(ds_cond, event_cond, "hookft", &block)
end
# if block is given, execute in another thread (concurrent computing)
def hookcc(ds_cond, event_cond = true, &block)
register_hookcc(ds_cond, event_cond, "hookcc", &block)
end
# if block is given, execute in another thread (concurrent computing)
def hookccft(ds_cond, event_cond = true, &block)
register_hookcc(ds_cond, event_cond, "hookccft", &block)
end
def register_hookcc(ds_cond, event_cond, hookby, &block)
if block_given?
$outvoke.register_proc(ds_cond, event_cond, hookby) do |e; tmp_event|
tmp_event = YAML.unsafe_load(e.to_yaml) # deep copy
$outvoke.hookcc_thread_group.add Thread.start {
Thread.current["hookcc_result"] = instance_exec(tmp_event , &block)
if Thread.current["hookcc_result"] == true
Thread.current["hook_result_for_post_process"] = tmp_event
elsif Thread.current["hookcc_result"].nil? || Thread.current["hookcc_result"] == false
Thread.current["hook_result_for_post_process"] = nil
else
Thread.current["hook_result_for_post_process"] = Thread.current["hookcc_result"]
end
# post process
@ds.post_procs.each do |proc|
if proc.arity == 2
proc.call(e, Thread.current["hook_result_for_post_process"])
elsif proc.arity == 3
proc.call(e, Thread.current["hook_result_for_post_process"], hookby)
else
raise "ERROR: invalid Proc was found in post_procs (ds: \"#{@ds.label}\", proc.arity: #{proc.arity})"
end
end
}
nil
end
else
hook(ds_cond, event_cond)
end
end
def hooklo(event_cond = true, &block)
hook("lo", event_cond, &block)
end
def hookstd(event_cond = true, &block)
hook("stdin", event_cond, &block)
end
def hookaux(event_cond = true, &block)
hook("auxin:fifo:1", event_cond, &block)
end
def hooksec(event_cond = true, &block)
hook("every-sec", event_cond, &block)
end
def hookweb(event_cond = true, &block)
hook("web-001", event_cond, &block)
end
def hookvr(event_cond = true, &block)
hook(/^vr/, event_cond, &block)
end
def loputs(x = "", attachment = nil)
$outvoke.loputs(x, attachment)
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
def nodup(time, cond)
raise '$outvoke is not an Outvoke object' unless $outvoke.is_a?(Outvoke)
if block_given?
$outvoke.nodup(time, cond) {|time, cond| yield time, cond }
else
$outvoke.nodup(time, cond)
end
end
def nodup?(x)
raise '$outvoke is not an Outvoke object' unless $outvoke.is_a?(Outvoke)
$outvoke.nodup?(x)
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
refine BasicObject do
def to(ds_cond, attachment = nil)
$outvoke.extract_ds(ds_cond).map{ $outvoke.ds[_1] }.each do |ds|
ds << { "body" => self.to_s.dup, "attachment" => attachment }
end
nil
end
end
end
class OutvokeEvent < String
attr_accessor :status, :msgid, :time, :attachment, :m
attr_accessor :hookby, :hookcnt
attr_accessor :is_quiet_mode # deprecated
def initialize(x)
super x.to_s
@time = Time.now
@hookcnt = 0
end
def body = self.to_s
end
class OutvokeDataSource
attr_accessor :label, :status, :log_lines, :mutex_lo, :is_first_time_log_check,
:pre_procs, :post_procs
attr_reader :lo_data, :preset_post_procs
def initialize(outvoke, label)
@outvoke = outvoke
@label = label
@status = nil
@log_lines = []
@lo_data = []
@mutex_lo = Mutex.new
@is_first_time_log_check = false
@is_enabled = false
#
# Proc objects for pre process
# (Procs to process before the execution of a Proc registered by hook)
#
@pre_procs = []
#
# Proc objects for post process
# (Procs to process the value returned by Proc registered by hook)
#
@post_procs = []
@preset_post_procs = Hash.new
@preset_post_procs["terminal"] = ->(*_args) {
->(e, hook_result, hookby) {
return hook_result unless hook_result
if e.status.quiet_mode
puts hook_result.to_s
else
time_str = ""
if hookby == "hook"
time_str = e.time.to_s
elsif hookby == "hookcc"
time_str = Time.now.to_s
end
puts "[#{time_str}] #{hook_result}".gsub("\n", " ")
end
hook_result
}
}
@preset_post_procs["jsonl"] = ->(*_args) {
->(e, hook_result, hookby) {
return hook_result unless hook_result
if hook_result.is_a?(Hash) && hook_result["time"].nil? && hook_result[:time].nil?
time_str = ""
if hookby == "hook"
time_str = e.time.to_s
elsif hookby == "hookcc"
time_str = Time.now.to_s
end
hook_result["time"] = time_str
end
puts hook_result.to_json
hook_result
}
}
@preset_post_procs["yaml"] = ->(*_args) {
->(e, hook_result, hookby) {
return hook_result unless hook_result
if hook_result.is_a?(Hash) && hook_result["time"].nil? && hook_result[:time].nil?
time_str = ""
if hookby == "hook"
time_str = e.time.to_s
elsif hookby == "hookcc"
time_str = Time.now.to_s
end
hook_result["time"] = time_str
end
puts hook_result.to_yaml
hook_result
}
}
@preset_post_procs["osc"] = ->(*args) {
require 'osc-ruby'
if args.length == 0 || args[0].nil? || args[0] == ""
osc_server_addr = "localhost"
else
osc_server_addr = args[0].to_s
end
if args.length <= 1 || args[1].nil? || args[1] == ""
osc_server_port = 9000
else
osc_server_port = args[1].to_i
end
->(_e, hook_result, _hookby) {
return hook_result unless hook_result
return hook_result unless hook_result.respond_to?(:each)
osc_client = OSC::Client.new(osc_server_addr, osc_server_port)
# TODO: hook_result may be overwritten by other threads
Thread.start do
sleep_sec = 0
osc_msgs = [hook_result]
if hook_result.each.first.is_a?(Array)
osc_msgs = hook_result
end
osc_msgs.each do |msg|
sleep_sec = 0
if msg[0].nil?
sleep [msg[-1].to_f, 0].max
else
if msg.length >= 3
sleep_sec = msg.pop
end
if sleep_sec && sleep_sec.respond_to?(:to_f)
osc_client.send(OSC::Message.new(*msg.flatten))
sleep [sleep_sec.to_f, 0].max
end
end
end
end
hook_result
}
}
@post_procs << @preset_post_procs["terminal"].call
end
def enabled?
@is_enabled
end
def enabled=(x)
@is_enabled = !!x
end
def <<(x)
new_msgid = @outvoke.generate_new_msgid
x["msgid"] = new_msgid
@mutex_lo.synchronize do
@lo_data << x
end
new_msgid
end
def output_mode
@status.output_mode
end
def output_mode=(mode)
raise "ERROR: invalid mode" if (!mode) || mode.empty? || mode == "+"
is_mode_reset = false
procs = []
if mode.is_a?(Array)
procs[0] = Hash.new
procs[0]["proc_label"] = mode[0].to_s
procs[0]["mode_str"] = mode.join(":")
procs[0]["args"] = mode.drop(1)
if procs[0]["proc_label"].start_with?("+")
procs[0]["proc_label"] = procs[0]["proc_label"].sub("+", "")
else
is_mode_reset = true
end
else
is_mode_reset = true if not mode.start_with?("+")
mode.split("+").each_with_index do |label, i|
next if label == ""
procs << {
"proc_label" => label,
"mode_str" => label,
"args" => [],
}
end
end
if is_mode_reset
@post_procs = []
@status.output_mode = ""
end
procs.each do |pr|
next unless pr
next unless pr["proc_label"]
next unless pr["proc_label"] != ""
if @preset_post_procs[pr["proc_label"]].respond_to?(:call)
@post_procs << @preset_post_procs[pr["proc_label"]].call(*pr["args"])
if @status.output_mode == ""
@status.output_mode = pr["mode_str"]
else
@status.output_mode += "+#{pr["mode_str"]}"
end
else
raise "ERROR: preset_post_procs[\"#{pr["proc_label"]}\"] is not callable"
end
end
end
def each_event
raise NotImplementedError
end
def before_each_event
raise NotImplementedError
end
def after_each_event
raise NotImplementedError
end
def first_time_log_check
raise NotImplementedError
end
def generate_context(event)
OutvokeProcExecContext.new(@outvoke, self, event)
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
StructDSStdinStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time)
class OutvokeDataSourceStdin < OutvokeDataSource
attr_reader :stdin_new_lines
def initialize(outvoke, label)
super
@status = StructDSStdinStatus.new
@status.last_time = Time.now.floor
self.output_mode = "terminal"
@stdin = $stdin.each_line
@stdin_new_lines = []
@mutex_stdin = Mutex.new
@is_eof_reached = false
@thread_stdin = Thread.start do
Thread.stop
loop do
Thread.stop unless @is_enabled
begin
Thread.current["tmp"] = @stdin.next
rescue StopIteration
@is_eof_reached = true
break
end
Thread.stop unless @is_enabled
Thread.current["new_msgid"] = @outvoke.generate_new_msgid
@mutex_stdin.synchronize do
@stdin_new_lines << {
"msgid" => Thread.current["new_msgid"],
"body" => Thread.current["tmp"].chomp
}
end
end
end
@outvoke.ds_thread_group.add @thread_stdin
end
def enabled=(x)
return true if @is_enabled == true && x
@is_enabled = !!x
return false unless @is_enabled
sleep 0.05 # TODO
tmp_status = @thread_stdin.status
if tmp_status == "run" || tmp_status == "sleep"
begin
@thread_stdin.run
rescue ThreadError
return false
end
end
true
end
def each_event
@log_lines.each do |line|
event = OutvokeEvent.new(line["body"])
event.msgid = line["msgid"]
event.status = @status
event.status.now = event.time
yield event
@status.last_time = event.time
end
self
end
def before_each_event
@log_lines = []
return unless @is_enabled
@mutex_stdin.synchronize do
@log_lines = @stdin_new_lines
@stdin_new_lines = []
end
end
def after_each_event
nil
end
def first_time_log_check(ds, event)
nil
end
def eof_reached?
@is_eof_reached
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
StructDSAuxinStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time)
class OutvokeDataSourceAuxinFifo < OutvokeDataSource
def initialize(outvoke, label)
super
@status = StructDSAuxinStatus.new
@status.last_time = Time.now.floor
self.output_mode = "terminal"
@new_lines = []
@mutex_new_lines = Mutex.new
m = label.match(/\Aauxin:fifo:([1-9]+[0-9]*)\z/)
@fifo_file_name = "./.outvoke.auxin.#{m[1]}.fifo"
@thread_auxin = Thread.start do
Thread.stop
loop do
next if File.symlink?(@fifo_file_name)
@fifo_file = File.open(@fifo_file_name).each_line
loop do
Thread.stop unless @is_enabled
Thread.current["tmp"] = @fifo_file.next
Thread.stop unless @is_enabled
Thread.current["new_msgid"] = @outvoke.generate_new_msgid
@mutex_new_lines.synchronize do
@new_lines << {
"msgid" => Thread.current["new_msgid"],
"body" => Thread.current["tmp"].chomp
}
end
end
end
end
@outvoke.ds_thread_group.add @thread_auxin
end
def enabled=(x)
return true if @is_enabled == true && x
@is_enabled = !!x
return false unless @is_enabled
sleep 0.05 # TODO
# create fifo file (named pipe)
unless File.pipe?(@fifo_file_name)
unless File.exist?(@fifo_file_name)
File.mkfifo(@fifo_file_name, 0606)
end
end
unless File.pipe?(@fifo_file_name)
@is_enabled = false
return
end
tmp_status = @thread_auxin.status
if tmp_status == "run" || tmp_status == "sleep"
begin
@thread_auxin.run
rescue ThreadError
return false
end
end
true
end
def each_event
@log_lines.each do |line|
event = OutvokeEvent.new(line["body"])
event.msgid = line["msgid"]
event.status = @status
event.status.now = event.time
yield event
@status.last_time = event.time
end
self
end
def before_each_event
@log_lines = []
return unless @is_enabled
@mutex_new_lines.synchronize do
@log_lines = @new_lines
@new_lines = []
end
end
def after_each_event
nil
end
def first_time_log_check(ds, event)
nil
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
StructDSLoopbackStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time)
class OutvokeDataSourceLoopback < OutvokeDataSource
def initialize(outvoke, label)
super
@status = StructDSLoopbackStatus.new
@status.last_time = Time.now.floor
self.output_mode = "terminal"
end
def each_event
@log_lines.each do |line|
event = OutvokeEvent.new(line["body"])
event.msgid = line["msgid"]
event.attachment = line["attachment"]
event.status = @status
event.status.now = event.time
yield event
@status.last_time = event.time
end
self
end
def before_each_event
@log_lines = []
@mutex_lo.synchronize do
@log_lines = @lo_data
@lo_data = []
end
end
def after_each_event
nil
end
def first_time_log_check(ds, event)
nil
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
StructDSWebStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time, :ds_label)
class OutvokeEventWeb < OutvokeEvent
attr_accessor :req, :get, :post, :res
end
class OutvokeDataSourceWeb < OutvokeDataSource
attr_accessor :port, :document_root, :mount_proc_dir, :timeout
attr_reader :first_res_list
def initialize(outvoke, label)
super
@status = StructDSWebStatus.new
@status.last_time = Time.now.floor
@preset_post_procs["web"] = ->(*_args) {
->(e, hook_result) {
register_first_res(e.msgid, hook_result)
}
}
self.output_mode = "web"
@websrv = nil
@port = 8080
@document_root = nil
@mount_proc_dir = "/"
@timeout = 5
@first_res_list = Hash.new
@mutex_first_res = Mutex.new
end
# TODO
def enabled=(x)
@is_enabled = !!x
end
def websrv_start
require 'uri'
require 'webrick'
websrv = WEBrick::HTTPServer.new({:Port => @port,
:DocumentRoot => @document_root})
if @mount_proc_dir
websrv.mount_proc @mount_proc_dir do |req, res|
req.query # TODO
msgid = self << { "body" => req.path.to_s.dup, "attachment" => [req, res] }
res_found = false
loop_start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
loop do
break if loop_start_time + @timeout < Process.clock_gettime(Process::CLOCK_MONOTONIC)
# TODO (delete from @first_res_list when done)
tmp_res = nil
@mutex_first_res.synchronize do
tmp_res = @first_res_list[msgid]
end
if tmp_res
if res == tmp_res
# nop
elsif tmp_res.is_a?(WEBrick::HTTPResponse) # maybe hookcc
res.body = tmp_res.body
tmp_res.header.each do |key, value|
res.header[key] = value
end
else
res.body = tmp_res.to_s
res.body += "\n" if res.body[-1] != "\n"
end
res_found = true
break
end
end
unless res_found
res.status = 500
res.body = "error\n"
end
end
end
@outvoke.ds_thread_group.add Thread.start { websrv.start }
end
def each_event
@log_lines.each do |line|
event = OutvokeEventWeb.new(line["body"])
event.msgid = line["msgid"]
event.attachment = line["attachment"]
event.req = line["attachment"][0]
event.res = line["attachment"][1]
if event.req.request_method == "GET"
event.get = event.req.query
event.post = Hash.new
elsif event.req.request_method == "POST"
event.get = Hash[URI::decode_www_form(event.req.request_uri.query)]
event.post = event.req.query
end
event.status = @status
event.status.now = event.time
event.status.ds_label = @label
yield event
@status.last_time = event.time
end
self
end
def before_each_event
@log_lines = []
@mutex_lo.synchronize do
@log_lines = @lo_data
@lo_data = []
end
end
def after_each_event
nil
end
def first_time_log_check(ds, event)
nil
end
def register_first_res(msgid, res)
return if res.nil?
@mutex_first_res.synchronize do
if @first_res_list[msgid].nil?
@first_res_list[msgid] = res
end
end
end
def generate_context(event)
context = OutvokeProcExecContext.new(@outvoke, self, event)
context.extend(Module.new do
def res(hook_result)
@ds.register_first_res(@event.msgid, hook_result)
end
end)
end
end
#
# TODO: experimental and incomplete implementation
#
# (specifications are subject to sudden change without notice)
#
StructDSOscStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time, :ds_label)
class OutvokeEventOsc < OutvokeEvent
attr_accessor :msg
end
class OutvokeDataSourceOsc < OutvokeDataSource
attr_accessor :port
def initialize(outvoke, label)
super
@status = StructDSOscStatus.new
@status.last_time = Time.now.floor
self.output_mode = "terminal"
@thread_osc = nil
@port = 9001
@new_msgs = []
@mutex_osc = Mutex.new
end
# TODO
def enabled=(x)
@is_enabled = !!x
end
def server_start
require 'osc-ruby'
require 'osc-ruby/em_server'
oscsrv = OSC::EMServer.new(@port)
oscsrv.add_method(nil) do |msg|
@mutex_osc.synchronize do
@new_msgs << {
"msgid" => @outvoke.generate_new_msgid,
"body" => msg.address.chomp,
"msgbody" => msg.to_a
}
end
end
@thread_osc = Thread.start { oscsrv.run }
@outvoke.ds_thread_group.add @thread_osc
end
def each_event
@log_lines.each do |line|
event = OutvokeEventOsc.new(line["body"])
event.msgid = line["msgid"]
event.msg = line["msgbody"]
event.status = @status
event.status.now = event.time
yield event
@status.last_time = event.time
end
self
end
def before_each_event
@log_lines = []
return unless @is_enabled
@mutex_osc.synchronize do
@log_lines = @new_msgs
@new_msgs = []
end
end
def after_each_event
nil
end
end
StructDSVrchatStatus = Struct.new(:now, :quiet_mode, :output_mode, :logfile, :file_size,
:last_joined_time, :elapsed, :count, :lineno)
class OutvokeEventVrchat < OutvokeEvent
attr_accessor :raw, :type
end
class OutvokeDataSourceVrchat < OutvokeDataSource
attr_accessor :log_dir
def initialize(outvoke, label)
super
@status = StructDSVrchatStatus.new
@status.logfile = nil
@status.file_size = 0
@status.count = 0
@status.lineno = 0
@status.last_joined_time = Time.now
self.output_mode = "terminal"
@is_first_time_log_check = true
@log_dir = ''
@has_new_log_lines = false
end
def each_event
return unless @has_new_log_lines
loop do
cnt = 0
line_raw = ''
m = nil
@log_lines[@status.lineno..].each do |line|
cnt += 1
line_raw = line
m = line.chomp.match(/\A([0-9]{4}\.[0-9]{2}\.[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) +([-_a-zA-Z0-9]+) +(.*)\z/)
break if m # if found
end
@status.lineno += cnt
@has_new_log_lines = false
return self unless m
event = OutvokeEventVrchat.new(m[3].sub(/\A *- */, ''))
event.raw = line_raw
event.time = Time.parse(m[1]) # unlike event.status.now, a value taken from log data
event.type = m[2]
event.msgid = @outvoke.generate_new_msgid
event.status = @status
event.status.now = Time.now
if event.body.start_with?('[Behaviour] Joining or Creating Room: ')
event.status.last_joined_time = event.time
end
event.status.elapsed = event.status.now - event.status.last_joined_time
yield event
end
self
end
def before_each_event
tmp_now = Time.now
has_file_changed = false
# check if it is a new file
Dir.glob("#{@log_dir}/output_log_*.txt").last.then do
if @status.logfile != _1
has_file_changed = true
@status.logfile = _1
@status.lineno = 0
if not $outvoke.quiet_mode? # TODO
puts "[#{tmp_now}] [outvoke-system] #{@label}: a new log file was found." if @status.logfile
end
end
end
# return if not found
unless @status.logfile
if @is_first_time_log_check
puts "[#{tmp_now}] [outvoke-system] #{@label}: ERROR: VRChat log file not found"
end
return
end
# return if not readable
return unless File.readable?(@status.logfile)
# check if file size has increased
unless has_file_changed
if @status.file_size < File.size(@status.logfile)
has_file_changed = true
end
end
# return if the log file has not changed
return unless has_file_changed
if @is_first_time_log_check
tmp = @status.logfile.sub(
/\A.*output_log_([0-9]{4}-[0-9]{2}-[0-9]{2})_([0-9]{2})-([0-9]{2})-([0-9]{2})\.txt\z/,
"\\1 \\2:\\3:\\4")
@status.last_joined_time = Time.parse(tmp)
end
# read VRChat log file
File.read(@status.logfile).then do |file_body|
@log_lines = file_body.split("\n")
@status.file_size = file_body.bytesize
end
@has_new_log_lines = true
end
def after_each_event
if @is_first_time_log_check
if not $outvoke.quiet_mode? # TODO
puts "[#{Time.now}] [outvoke-system] #{@label}: first time log check has done."
end
@is_first_time_log_check = false
end
end
def first_time_log_check(ds, event)
return unless event.body.start_with?('[B')
if event.body.start_with?('[Behaviour] Entering Room: ')
@status.count = 0
elsif event.body.start_with?('[Behaviour] OnPlayerJoined ')
@status.count += 1
elsif event.body.start_with?('[Behaviour] OnPlayerLeft ')
@status.count -= 1
@status.count = 0 if @status.count < 0
end
end
end
StructDSEverySecStatus = Struct.new(:now, :quiet_mode, :output_mode, :last_time)
class OutvokeDataSourceEverySec < OutvokeDataSource
def initialize(outvoke, label)
super
@status = StructDSEverySecStatus.new
@status.last_time = Time.now.floor
self.output_mode = "terminal"
end
def each_event
@log_lines.each do |line|
event = OutvokeEvent.new(line["body"])
event.time = line["body"] # TODO
event.msgid = line["msgid"]
event.status = @status
event.status.now = Time.now
yield event
@status.last_time = line["body"]
end
self
end
def before_each_event
@log_lines = []
tmp_time = @status.last_time.floor
loop do
tmp_time += 1
return if tmp_time >= Time.now
@log_lines << {"msgid" => @outvoke.generate_new_msgid, "body" => tmp_time}
end
end
def after_each_event
nil
end
def first_time_log_check(ds, event)
nil
end
end
# --------------------------------------------------------------------
# from this point forward, it will only be executed when this file run
# as a command, not when this file included by require.
if $0 == __FILE__
$stdout.sync = true
def log_puts(str, line_head = '')
return if $outvoke.quiet_mode?
if line_head == ''
puts str.gsub("\n", " ")
else
puts "#{line_head}#{str.gsub("\n", "\n" + line_head)}"
end
end
$outvoke = Outvoke.new
version_str = "#{$outvoke.version.branch} (version #{$outvoke.version.body})"
$outvoke.read_env_settings
$outvoke.read_env_vars
require 'optparse'
is_version_puts_mode = false
ruby_code = nil
opts = OptionParser.new
opts.default_argv = ['main.rb']
opts.on('-v', '--version') { is_version_puts_mode = true }
opts.on('-e CODE') { |optvalue| ruby_code = optvalue }
opts.on('-q', '--quiet') { $outvoke.quiet_mode = true }
opts.on('--wait INTERVAL') do |optvalue|
raise "--wait option allows only numeric" if optvalue !~ /\A[0-9]+(\.[0-9]+)?\z/
$outvoke.wait = optvalue.to_f
end
opts.on('-O OUTPUT_MODE', '--output-mode OUTPUT_MODE') do |optvalue|
if $outvoke.default_output_mode
if optvalue.start_with?("+")
$outvoke.default_output_mode += optvalue
else
$outvoke.default_output_mode = optvalue
end
else
$outvoke.default_output_mode = optvalue
end
if ["jsonl", "yaml"].include?(optvalue)
$outvoke.quiet_mode = true
end
end
opts.on('--var VAR:VALUE') do |optvalue|
raise "--var option only allows the format varname:value" if optvalue !~ /\A[a-z][_a-z0-9]*:/
optvalue.split(":", 2).then do |var_name, value|
if ["stdin", "stdout", "stderr", "outvoke", "o", "ds"].include?(var_name)
raise "invalid var name \"#{var_name}\""
end
instance_variable_set("@tmp", value)
eval "$#{var_name} = @tmp"
end
end
# parse options
if ARGV[0]
specified_file = opts.parse!(ARGV)[0]
else
specified_file = opts.parse![0]
end
if !ruby_code && !specified_file
specified_file = 'main.rb'
end
# output version info and exit
if is_version_puts_mode
if $outvoke.quiet_mode?
puts version_str
else
puts "Outvoke #{version_str}"
puts "Ruby #{RUBY_VERSION} (#{RUBY_DESCRIPTION})"
end
exit 0
end
log_puts "# starting Outvoke #{version_str} ---- #{Time.now}"
log_puts "# Ruby: #{RUBY_DESCRIPTION}"
log_puts "# ----"
'stdin'.then do
$outvoke.ds[_1] = OutvokeDataSourceStdin.new($outvoke, _1)
end
'auxin:fifo:1'.then do
$outvoke.ds[_1] = OutvokeDataSourceAuxinFifo.new($outvoke, _1)
end
'lo'.then do
$outvoke.ds[_1] = OutvokeDataSourceLoopback.new($outvoke, _1)
end
'every-sec'.then do
$outvoke.ds[_1] = OutvokeDataSourceEverySec.new($outvoke, _1)
end
'web-001'.then do
$outvoke.ds[_1] = OutvokeDataSourceWeb.new($outvoke, _1)
end
'osc-001'.then do
$outvoke.ds[_1] = OutvokeDataSourceOsc.new($outvoke, _1)
end
'vrchat-001'.then do
vrchat_log_dir = "#{Dir.home}/.steam/debian-installation/steamapps/compatdata/438100/pfx/drive_c/users/steamuser/AppData/LocalLow/VRChat/VRChat"
$outvoke.ds[_1] = OutvokeDataSourceVrchat.new($outvoke, _1)
$outvoke.ds[_1].log_dir = vrchat_log_dir
end
if $outvoke.default_output_mode
$outvoke.ds.each do |ds_label, ds|
ds.output_mode = $outvoke.default_output_mode
end
end
'./outvoke.conf.rb'.then do
if File.readable?(_1)
log_puts "# loading #{_1} ..."
require _1
end
end
if ruby_code
log_puts '# given ruby code:'
log_puts ruby_code, '# '
eval "using OutvokeDSL;#{ruby_code}"
else
file_prefix = ''
file_prefix = './' if not specified_file.start_with?('/')
specified_file = "#{file_prefix }#{specified_file}"
log_puts "# loading #{specified_file} ..."
if File.readable?(specified_file)
require specified_file
else
log_puts "# ERROR: #{specified_file} does not exist or is not readable"
exit 1
end
end
log_puts '# ----'
'web-001'.then do
if $outvoke[_1].enabled?
log_puts "# starting web server ..."
$outvoke[_1].websrv_start
end
end
'osc-001'.then do
if $outvoke[_1].enabled?
log_puts "# starting OSC server ..."
$outvoke[_1].server_start
end
end
# execute mainloop
begin
log_puts '# starting $outvoke.mainloop ...'
$outvoke.mainloop
rescue Interrupt
if not $outvoke.quiet_mode?
puts "[#{Time.now}] [outvoke-system] interrupted"
end
end
end