let make_channel
shard_id
string_of_read_message
string_of_written_message
fd_read
fd_write =
let () =
set_nonblock fd_read;
set_close_on_exec fd_read;
set_close_on_exec fd_write
in
let chn_write = out_channel_of_descr fd_write in
let really_read fd str =
let off = ref 0 in
let read = ref 0 in
while !read < String.length str do
try
let one_read =
Unix.read fd str !off (String.length str - !off)
in
read := !read + one_read;
off := !off + one_read
with Unix_error(EAGAIN, _, _) ->
()
done;
str
in
let header_str = String.create Marshal.header_size in
let send_data msg =
Marshal.to_channel chn_write msg [];
Pervasives.flush chn_write
in
let receive_data () =
try
let data_size = Marshal.data_size (really_read fd_read header_str) 0 in
let data_str = really_read fd_read (String.create data_size) in
let msg = Marshal.from_string (header_str ^ data_str) 0 in
msg
with Failure(msg) ->
OUnitUtils.failwithf "Communication error with worker processes: %s" msg
in
let close () =
close_out chn_write;
in
wrap_channel
shard_id
string_of_read_message
string_of_written_message
{
send_data = send_data;
receive_data = receive_data;
close = close
}