Broker-Enabled Communication Framework

Bro can now use the Broker Library to exchange information with other Bro processes. To enable it run Bro’s configure script with the --enable-broker option. Note that a C++11 compatible compiler (e.g. GCC 4.8+ or Clang 3.3+) is required as well as the C++ Actor Framework.

Connecting to Peers

Communication via Broker must first be turned on via BrokerComm::enable.

Bro can accept incoming connections by calling BrokerComm::listen and then monitor connection status updates via BrokerComm::incoming_connection_established and BrokerComm::incoming_connection_broken.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
connecting-listener.bro


const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::listen(broker_port, "127.0.0.1");
	}

event BrokerComm::incoming_connection_established(peer_name: string)
	{
	print "BrokerComm::incoming_connection_established", peer_name;
	}

event BrokerComm::incoming_connection_broken(peer_name: string)
	{
	print "BrokerComm::incoming_connection_broken", peer_name;
	terminate();
	}

Bro can initiate outgoing connections by calling BrokerComm::connect and then monitor connection status updates via BrokerComm::outgoing_connection_established, BrokerComm::outgoing_connection_broken, and BrokerComm::outgoing_connection_incompatible.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
connecting-connector.bro


const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::connect("127.0.0.1", broker_port, 1sec);
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "BrokerComm::outgoing_connection_established",
		  peer_address, peer_port, peer_name;
	terminate();
	}

Remote Printing

To receive remote print messages, first use BrokerComm::subscribe_to_prints to advertise to peers a topic prefix of interest and then create an event handler for BrokerComm::print_handler to handle any print messages that are received.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
printing-listener.bro


const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
global msg_count = 0;

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::subscribe_to_prints("bro/print/");
	BrokerComm::listen(broker_port, "127.0.0.1");
	}

event BrokerComm::incoming_connection_established(peer_name: string)
	{
	print "BrokerComm::incoming_connection_established", peer_name;
	}

event BrokerComm::print_handler(msg: string)
	{
	++msg_count;
	print "got print message", msg;

	if ( msg_count == 3 )
		terminate();
	}

To send remote print messages, just call BrokerComm::print.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
printing-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::connect("127.0.0.1", broker_port, 1sec);
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "BrokerComm::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	BrokerComm::print("bro/print/hi", "hello");
	BrokerComm::print("bro/print/stuff", "...");
	BrokerComm::print("bro/print/bye", "goodbye");
	}

event BrokerComm::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Notice that the subscriber only used the prefix “bro/print/”, but is able to receive messages with full topics of “bro/print/hi”, “bro/print/stuff”, and “bro/print/bye”. The model here is that the publisher of a message checks for all subscribers who advertised interest in a prefix of that message’s topic and sends it to them.

Message Format

For other applications that want to exchange print messages with Bro, the Broker message format is simply:

broker::message{std::string{}};

Remote Events

Receiving remote events is similar to remote prints. Just use BrokerComm::subscribe_to_events and possibly define any new events along with handlers that peers may want to send.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
events-listener.bro


const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";
global msg_count = 0;
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::subscribe_to_events("bro/event/");
	BrokerComm::listen(broker_port, "127.0.0.1");
	}

event BrokerComm::incoming_connection_established(peer_name: string)
	{
	print "BrokerComm::incoming_connection_established", peer_name;
	}

event my_event(msg: string, c: count)
	{
	++msg_count;
	print "got my_event", msg, c;

	if ( msg_count == 5 )
		terminate();
	}

event my_auto_event(msg: string, c: count)
	{
	++msg_count;
	print "got my_auto_event", msg, c;

	if ( msg_count == 5 )
		terminate();
	}

To send events, there are two choices. The first is to use call BrokerComm::event directly. The second option is to use BrokerComm::auto_event to make it so a particular event is automatically sent to peers whenever it is called locally via the normal event invocation syntax.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
events-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
global my_event: event(msg: string, c: count);
global my_auto_event: event(msg: string, c: count);

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::connect("127.0.0.1", broker_port, 1sec);
	BrokerComm::auto_event("bro/event/my_auto_event", my_auto_event);
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "BrokerComm::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "hi", 0));
	event my_auto_event("stuff", 88);
	BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "...", 1));
	event my_auto_event("more stuff", 51);
	BrokerComm::event("bro/event/my_event", BrokerComm::event_args(my_event, "bye", 2));
	}

event BrokerComm::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Again, the subscription model is prefix-based.

Message Format

For other applications that want to exchange event messages with Bro, the Broker message format is:

broker::message{std::string{}, ...};

The first parameter is the name of the event and the remaining ... are its arguments, which are any of the support Broker data types as they correspond to the Bro types for the event named in the first parameter of the message.

Remote Logging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
testlog.bro


module Test;

export {
	redef enum Log::ID += { LOG };

	type Info: record {
		msg: string &log;
		num: count &log;
	};

	global log_test: event(rec: Test::Info);
}

event bro_init() &priority=5
	{
	BrokerComm::enable();
	Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test, $path="test"]);
	}

Use BrokerComm::subscribe_to_logs to advertise interest in logs written by peers. The topic names that Bro uses are implicitly of the form “bro/log/<stream-name>”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
logs-listener.bro

@load ./testlog

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "listener";

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::subscribe_to_logs("bro/log/Test::LOG");
	BrokerComm::listen(broker_port, "127.0.0.1");
	}

event BrokerComm::incoming_connection_established(peer_name: string)
	{
	print "BrokerComm::incoming_connection_established", peer_name;
	}

event Test::log_test(rec: Test::Info)
	{
	print "wrote log", rec;

	if ( rec$num == 5 )
		terminate();
	}

To send remote logs either use Log::enable_remote_logging or BrokerComm::enable_remote_logs. The former allows any log stream to be sent to peers while the later toggles remote logging for particular streams.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
logs-connector.bro

@load ./testlog

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;
redef BrokerComm::endpoint_name = "connector";
redef Log::enable_local_logging = F;
redef Log::enable_remote_logging = F;
global n = 0;

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::enable_remote_logs(Test::LOG);
	BrokerComm::connect("127.0.0.1", broker_port, 1sec);
	}

event do_write()
	{
	if ( n == 6 )
		return;

	Log::write(Test::LOG, [$msg = "ping", $num = n]);
	++n;
	event do_write();
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "BrokerComm::outgoing_connection_established",
	      peer_address, peer_port, peer_name;
	event do_write();
	}

event BrokerComm::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

Message Format

For other applications that want to exchange logs messages with Bro, the Broker message format is:

broker::message{broker::enum_value{}, broker::record{}};

The enum value corresponds to the stream’s Log::ID value, and the record corresponds to a single entry of that log’s columns record, in this case a Test::INFO value.

Tuning Access Control

By default, endpoints do not restrict the message topics that it sends to peers and do not restrict what message topics and data store identifiers get advertised to peers. These are the default BrokerComm::EndpointFlags supplied to BrokerComm::enable.

If not using the auto_publish flag, one can use the BrokerComm::publish_topic and BrokerComm::unpublish_topic functions to manipulate the set of message topics (must match exactly) that are allowed to be sent to peer endpoints. These settings take precedence over the per-message peers flag supplied to functions that take a BrokerComm::SendFlags such as BrokerComm::print, BrokerComm::event, BrokerComm::auto_event or BrokerComm::enable_remote_logs.

If not using the auto_advertise flag, one can use the BrokerComm::advertise_topic and BrokerComm::unadvertise_topic to manupulate the set of topic prefixes that are allowed to be advertised to peers. If an endpoint does not advertise a topic prefix, the only way a peers can send messages to it is via the unsolicited flag of BrokerComm::SendFlags and choosing a topic with a matching prefix (i.e. full topic may be longer than receivers prefix, just the prefix needs to match).

Distributed Data Stores

There are three flavors of key-value data store interfaces: master, clone, and frontend.

A frontend is the common interface to query and modify data stores. That is, a clone is a specific type of frontend and a master is also a specific type of frontend, but a standalone frontend can also exist to e.g. query and modify the contents of a remote master store without actually “owning” any of the contents itself.

A master data store can be be cloned from remote peers which may then perform lightweight, local queries against the clone, which automatically stays synchronized with the master store. Clones cannot modify their content directly, instead they send modifications to the centralized master store which applies them and then broadcasts them to all clones.

Master and clone stores get to choose what type of storage backend to use. E.g. In-memory versus SQLite for persistence. Note that if clones are used, data store sizes should still be able to fit within memory regardless of the storage backend as a single snapshot of the master store is sent in a single chunk to initialize the clone.

Data stores also support expiration on a per-key basis either using an absolute point in time or a relative amount of time since the entry’s last modification time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
stores-listener.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;

global h: opaque of BrokerStore::Handle;
global expected_key_count = 4;
global key_count = 0;

function do_lookup(key: string)
	{
	when ( local res = BrokerStore::lookup(h, BrokerComm::data(key)) )
		{
		++key_count;
		print "lookup", key, res;

		if ( key_count == expected_key_count )
			terminate();
		}
	timeout 10sec
		{ print "timeout", key; }
	}

event ready()
	{
	h = BrokerStore::create_clone("mystore");

	when ( local res = BrokerStore::keys(h) )
		{
		print "clone keys", res;
		do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 0)));
		do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 1)));
		do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 2)));
		do_lookup(BrokerComm::refine_to_string(BrokerComm::vector_lookup(res$result, 3)));
		}
	timeout 10sec
		{ print "timeout"; }
	}

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::subscribe_to_events("bro/event/ready");
	BrokerComm::listen(broker_port, "127.0.0.1");
	}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
stores-connector.bro

const broker_port: port = 9999/tcp &redef;
redef exit_only_after_terminate = T;

global h: opaque of BrokerStore::Handle;

function dv(d: BrokerComm::Data): BrokerComm::DataVector
	{
	local rval: BrokerComm::DataVector;
	rval[0] = d;
	return rval;
	}

global ready: event();

event BrokerComm::outgoing_connection_broken(peer_address: string,
                                       peer_port: port)
	{
	terminate();
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	local myset: set[string] = {"a", "b", "c"};
	local myvec: vector of string = {"alpha", "beta", "gamma"};
	h = BrokerStore::create_master("mystore");
	BrokerStore::insert(h, BrokerComm::data("one"), BrokerComm::data(110));
	BrokerStore::insert(h, BrokerComm::data("two"), BrokerComm::data(223));
	BrokerStore::insert(h, BrokerComm::data("myset"), BrokerComm::data(myset));
	BrokerStore::insert(h, BrokerComm::data("myvec"), BrokerComm::data(myvec));
	BrokerStore::increment(h, BrokerComm::data("one"));
	BrokerStore::decrement(h, BrokerComm::data("two"));
	BrokerStore::add_to_set(h, BrokerComm::data("myset"), BrokerComm::data("d"));
	BrokerStore::remove_from_set(h, BrokerComm::data("myset"), BrokerComm::data("b"));
	BrokerStore::push_left(h, BrokerComm::data("myvec"), dv(BrokerComm::data("delta")));
	BrokerStore::push_right(h, BrokerComm::data("myvec"), dv(BrokerComm::data("omega")));

	when ( local res = BrokerStore::size(h) )
		{
		print "master size", res;
		event ready();
		}
	timeout 10sec
		{ print "timeout"; }
	}

event bro_init()
	{
	BrokerComm::enable();
	BrokerComm::connect("127.0.0.1", broker_port, 1secs);
	BrokerComm::auto_event("bro/event/ready", ready);
	}

In the above example, if a local copy of the store contents isn’t needed, just replace the BrokerStore::create_clone call with BrokerStore::create_frontend. Queries will then be made against the remote master store instead of the local clone.

Note that all queries are made within Bro’s asynchrounous when statements and must specify a timeout block.

Copyright 2013, The Bro Project. Last updated on July 19, 2016. Created using Sphinx 1.4.4.