Bro can now use the Broker Library to exchange information with
other Bro processes. To enable it run Bro’s configure
script
with the --enable-broker
option. Note that a C++11 compatible
compiler (e.g. GCC 4.8+ or Clang 3.3+) is required as well as the
C++ Actor Framework.
Contents
Communication via Broker must first be turned on via
BrokerComm::enable
.
Bro can accept incoming connections by calling BrokerComm::listen
and then monitor connection status updates via
BrokerComm::incoming_connection_established
and
BrokerComm::incoming_connection_broken
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | connecting-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
event bro_init()
{
BrokerComm::enable();
BrokerComm::listen(broker_port, "127.0.0.1");
}
event BrokerComm::incoming_connection_established(peer_name: string)
{
print "BrokerComm::incoming_connection_established", peer_name;
}
event BrokerComm::incoming_connection_broken(peer_name: string)
{
print "BrokerComm::incoming_connection_broken", peer_name;
terminate();
}
|
Bro can initiate outgoing connections by calling BrokerComm::connect
and then monitor connection status updates via
BrokerComm::outgoing_connection_established
,
BrokerComm::outgoing_connection_broken
, and
BrokerComm::outgoing_connection_incompatible
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | connecting-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
event bro_init()
{
BrokerComm::enable();
BrokerComm::connect("127.0.0.1", broker_port, 1sec);
}
event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
terminate();
}
|
To receive remote print messages, first use
BrokerComm::subscribe_to_prints
to advertise to peers a topic
prefix of interest and then create an event handler for
BrokerComm::print_handler
to handle any print messages that are
received.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | printing-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
global msg_count = 0;
event bro_init()
{
BrokerComm::enable();
BrokerComm::subscribe_to_prints("bro/print/");
BrokerComm::listen(broker_port, "127.0.0.1");
}
event BrokerComm::incoming_connection_established(peer_name: string)
{
print "BrokerComm::incoming_connection_established", peer_name;
}
event BrokerComm::print_handler(msg: string)
{
++msg_count;
print "got print message", msg;
if ( msg_count == 3 )
terminate();
}
|
To send remote print messages, just call BrokerComm::print
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | printing-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
event bro_init()
{
BrokerComm::enable();
BrokerComm::connect("127.0.0.1", broker_port, 1sec);
}
event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
BrokerComm::print("bro/print/hi", "hello");
BrokerComm::print("bro/print/stuff", "...");
BrokerComm::print("bro/print/bye", "goodbye");
}
event BrokerComm::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
Notice that the subscriber only used the prefix “bro/print/”, but is able to receive messages with full topics of “bro/print/hi”, “bro/print/stuff”, and “bro/print/bye”. The model here is that the publisher of a message checks for all subscribers who advertised interest in a prefix of that message’s topic and sends it to them.
For other applications that want to exchange print messages with Bro, the Broker message format is simply:
broker::message{std::string{}};
Receiving remote events is similar to remote prints. Just use
BrokerComm::subscribe_to_events
and possibly define any new events
along with handlers that peers may want to send.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | events-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
BrokerComm::enable();
BrokerComm::subscribe_to_events("bro/event/");
BrokerComm::listen(broker_port, "127.0.0.1");
}
event BrokerComm::incoming_connection_established(peer_name: string)
{
print "BrokerComm::incoming_connection_established", peer_name;
}
event my_event(msg: string, c: count)
{
++msg_count;
print "got my_event", msg, c;
if ( msg_count == 5 )
terminate();
}
event my_auto_event(msg: string, c: count)
{
++msg_count;
print "got my_auto_event", msg, c;
if ( msg_count == 5 )
terminate();
}
|
To send events, there are two choices. The first is to use call
BrokerComm::event
directly. The second option is to use
BrokerComm::auto_event
to make it so a particular event is
automatically sent to peers whenever it is called locally via the normal
event invocation syntax.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | events-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);
event bro_init()
{
BrokerComm::enable();
BrokerComm::connect("127.0.0.1", broker_port, 1sec);
BrokerComm::auto_event("bro/event/my_auto_event", my_auto_event);
}
event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "hi", 0));
event my_auto_event("stuff", 88);
BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "...", 1));
event my_auto_event("more stuff", 51);
BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "bye", 2));
}
event BrokerComm::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
Again, the subscription model is prefix-based.
For other applications that want to exchange event messages with Bro, the Broker message format is:
broker::message{std::string{}, ...};
The first parameter is the name of the event and the remaining ...
are its arguments, which are any of the support Broker data types as
they correspond to the Bro types for the event named in the first
parameter of the message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | testlog.bro
module Test;
export {
redef enum Log::ID += { LOG };
type Info: record {
msg: string &log;
num: count &log;
};
global log_test: event(rec: Test::Info);
}
event bro_init() &priority=5
{
BrokerComm::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test, $path="test"]);
}
|
Use BrokerComm::subscribe_to_logs
to advertise interest in logs
written by peers. The topic names that Bro uses are implicitly of the
form “bro/log/<stream-name>”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | logs-listener.bro
@load ./testlog
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
event bro_init()
{
BrokerComm::enable();
BrokerComm::subscribe_to_logs("bro/log/Test::LOG");
BrokerComm::listen(broker_port, "127.0.0.1");
}
event BrokerComm::incoming_connection_established(peer_name: string)
{
print "BrokerComm::incoming_connection_established", peer_name;
}
event Test::log_test(rec: Test::Info)
{
print "wrote log", rec;
if ( rec$num == 5 )
terminate();
}
|
To send remote logs either use Log::enable_remote_logging
or
BrokerComm::enable_remote_logs
. The former allows any log stream
to be sent to peers while the later toggles remote logging for
particular streams.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | logs-connector.bro
@load ./testlog
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
redef Log::enable_local_logging = F;
redef Log::enable_remote_logging = F;
global n = 0;
event bro_init()
{
BrokerComm::enable();
BrokerComm::enable_remote_logs(Test::LOG);
BrokerComm::connect("127.0.0.1", broker_port, 1sec);
}
event do_write()
{
if ( n == 6 )
return;
Log::write(Test::LOG, [$msg = "ping", $num = n]);
++n;
event do_write();
}
event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "BrokerComm::outgoing_connection_established",
peer_address, peer_port, peer_name;
event do_write();
}
event BrokerComm::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
|
For other applications that want to exchange logs messages with Bro, the Broker message format is:
broker::message{broker::enum_value{}, broker::record{}};
The enum value corresponds to the stream’s Log::ID
value, and
the record corresponds to a single entry of that log’s columns record,
in this case a Test::INFO
value.
By default, endpoints do not restrict the message topics that it sends
to peers and do not restrict what message topics and data store
identifiers get advertised to peers. These are the default
BrokerComm::EndpointFlags
supplied to BrokerComm::enable
.
If not using the auto_publish
flag, one can use the
BrokerComm::publish_topic
and BrokerComm::unpublish_topic
functions to manipulate the set of message topics (must match exactly)
that are allowed to be sent to peer endpoints. These settings take
precedence over the per-message peers
flag supplied to functions
that take a BrokerComm::SendFlags
such as BrokerComm::print
,
BrokerComm::event
, BrokerComm::auto_event
or
BrokerComm::enable_remote_logs
.
If not using the auto_advertise
flag, one can use the
BrokerComm::advertise_topic
and BrokerComm::unadvertise_topic
to manupulate the set of topic prefixes that are allowed to be
advertised to peers. If an endpoint does not advertise a topic prefix,
the only way a peers can send messages to it is via the unsolicited
flag of BrokerComm::SendFlags
and choosing a topic with a matching
prefix (i.e. full topic may be longer than receivers prefix, just the
prefix needs to match).
There are three flavors of key-value data store interfaces: master, clone, and frontend.
A frontend is the common interface to query and modify data stores. That is, a clone is a specific type of frontend and a master is also a specific type of frontend, but a standalone frontend can also exist to e.g. query and modify the contents of a remote master store without actually “owning” any of the contents itself.
A master data store can be be cloned from remote peers which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.
Master and clone stores get to choose what type of storage backend to use. E.g. In-memory versus SQLite for persistence. Note that if clones are used, data store sizes should still be able to fit within memory regardless of the storage backend as a single snapshot of the master store is sent in a single chunk to initialize the clone.
Data stores also support expiration on a per-key basis either using an absolute point in time or a relative amount of time since the entry’s last modification time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | stores-listener.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
global h: opaque of BrokerStore::Handle;
global expected_key_count = 4;
global key_count = 0;
function do_lookup(key: string)
{
when ( local res = BrokerStore::lookup(h, BrokerComm::data(key)) )
{
++key_count;
print "lookup", key, res;
if ( key_count == expected_key_count )
terminate();
}
timeout 10sec
{ print "timeout", key; }
}
event ready()
{
h = BrokerStore::create_clone("mystore");
when ( local res = BrokerStore::keys(h) )
{
print "clone keys", res;
do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 0)));
do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 1)));
do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 2)));
do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 3)));
}
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
BrokerComm::enable();
BrokerComm::subscribe_to_events("bro/event/ready");
BrokerComm::listen(broker_port, "127.0.0.1");
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | stores-connector.bro
const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
global h: opaque of BrokerStore::Handle;
function dv(d: BrokerComm::Data): BrokerComm::DataVector
{
local rval: BrokerComm::DataVector;
rval[0] = d;
return rval;
}
global ready: event();
event BrokerComm::outgoing_connection_broken(peer_address: string,
peer_port: port)
{
terminate();
}
event BrokerComm::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
local myset: set[string] = {"a", "b", "c"};
local myvec: vector of string = {"alpha", "beta", "gamma"};
h = BrokerStore::create_master("mystore");
BrokerStore::insert(h, BrokerComm::data("one"), BrokerComm::data(110));
BrokerStore::insert(h, BrokerComm::data("two"), BrokerComm::data(223));
BrokerStore::insert(h, BrokerComm::data("myset"), BrokerComm::data(myset));
BrokerStore::insert(h, BrokerComm::data("myvec"), BrokerComm::data(myvec));
BrokerStore::increment(h, BrokerComm::data("one"));
BrokerStore::decrement(h, BrokerComm::data("two"));
BrokerStore::add_to_set(h, BrokerComm::data("myset"), BrokerComm::data("d"));
BrokerStore::remove_from_set(h, BrokerComm::data("myset"), BrokerComm::data("b"));
BrokerStore::push_left(h, BrokerComm::data("myvec"), dv(BrokerComm::data("delta")));
BrokerStore::push_right(h, BrokerComm::data("myvec"), dv(BrokerComm::data("omega")));
when ( local res = BrokerStore::size(h) )
{
print "master size", res;
event ready();
}
timeout 10sec
{ print "timeout"; }
}
event bro_init()
{
BrokerComm::enable();
BrokerComm::connect("127.0.0.1", broker_port, 1secs);
BrokerComm::auto_event("bro/event/ready", ready);
}
|
In the above example, if a local copy of the store contents isn’t
needed, just replace the BrokerStore::create_clone
call with
BrokerStore::create_frontend
. Queries will then be made against
the remote master store instead of the local clone.
Note that all queries are made within Bro’s asynchrounous when
statements and must specify a timeout block.