We show the usage of Setak by a well-known example, BoundedBuffer. The code for BoundedBuffer actor is shown in the following:
class BoundedBuffer(size: Int) extends Actor { var content = new Array[Int](size) var head, tail, curSize = 0 def receive = { case Put(x) ⇒ if (curSize < size) { content(tail) = x tail = (tail + 1) % size curSize += 1 } case Get ⇒ if (curSize > 0) { val r = content(head) head = (head + 1) % size curSize -= 1 self.reply(r) } else self.reply(-2) } }
This actor accepts two messages, Put(x) and Get. Upon processing Put(x), if the current size of the buffer
is less than the maximum size, it adds x to its content. We assume that x is a positive integer.
Upon processing Get message, if the current size is greater
than zero, it removes a value from its content and returns that value. Otherwise, it returns -2.
Unit Testing
We want to test the behavior of a bounded buffer actor with capacity of one elment upon receiving two messages
Put and Get. We consider two senarios:
Put message followed by a Get message: For Put message, we need to send the message and then wait for the actor to process that message (the system reaches to a stable state).
Then check the internal state of the actor (white-box testing). After that, we send a Get message. For Get message we send the message and wait for the reply from the bounded buffer (black-box testing).
Put messages: The other scenario that might be interesting to check is the case where a buffer with capacity of one element, receives two
Put messages. In this case, it should be able to process both messages but only
updates its content upon processing the first Put message.
The test case for these two scenarios is shown in the following. For writing test cases in JUnit format, the test class should extend SetakJunit and for writing test cases in
FlatSpec or WordSpec formats use SetakFlatSpec or SetakWordSpec traits respectively.
In order to make the test control the actors in the program under test, you should use the factory provided by the Setak for creating actors, i.e. akka.setak.TestActorRefFactory.actorOf.
This factory returns an instance of TestActorRef as a reference to that actor. Each Setak test has a testActroRefFactory as a field. In order to create the acors you should call
actorOf method of that factory. Calling actrOf inside of the test class will automatically use that factory for creating actors.
The JUnit test suite for these scenarios are shown in the following. Note that for writing JUnit test suite the test class should extend SetakJUnit.
In order to check if two Put messages have been processed or not, we need to define the Put message sent to buffer as a TestMessageEnvelop.
Each TestMessageEnvelop consists of three parameters, sender, receiver, and
message.
The message parameter can be an object (using testMessageEnvelop factory) or or a pattern in the form of partial
function (using testMessagePatternEnvelop factory).
The wildcards for sender and receiver is anyActorRef and for message is anyMessage.
In this test, since the sender is the test class and it is not an actor, we set the sender parameter to anyActorRef. Furthermore, becuase the test message envelop put
will refer to any Put(x) message regardless of the value of x, it is created using testMessagePatternEnvelop factory.
You should use org.junit.Test annotation for specifying test methods, and org.junit.Before and org.junit.After annotations for specifying the methods that should be
called before and after each test.
class JUnitTestBoundedBuffer extends SetakJUnit { var buf: TestActorRef = _ var put: TestMessageEnvelop = _ @Before def setUp() { buf = actorOf(new BoundedBuffer(1)).start put = testMessagePatternEnvelop(anyActorRef, buf, { case Put(_) ⇒ }) } @Test def testPutGet() { buf ! Put(12) whenStable { assert(buf.actorObject[BoundedBuffer].curSize == 1, "The current size is not one") } val getValue = (buf ? Get).mapTo[Int].get assert(getValue == 12, "The returned value is not 12") } @Test def testTwoPuts() { buf ! Put(1) buf ! Put(2) whenStable { assert(buf.actorObject[BoundedBuffer].curSize == 1, "The current size is not one") assert(processingCount(put) == 2, "Put has not been processed twice") } } }
The API processingCount accepts a test message envelop and returns the number of times that message is processed.
To write the above test in FlatSpec form, the test class should extend SetakFlatSpec.
The above test in FlatSpec form is shown in the following:
class SpecTestBoundedBuffer extends SetakFlatSpec { var buf: TestActorRef = _ var put: TestMessageEnvelop = _ override def setUp() { buf = actorOf(new BoundedBuffer(1)).start put = testMessagePatternEnvelop(anyActorRef, buf, { case Put(_) ⇒ }) } "the current size of buffer" should "be one and returns value 12" in { buf ! Put(12) whenStable { assert(buf.actorObject[BoundedBuffer].curSize == 1, " The current size is not one") } val getValue = (buf ? Get).mapTo[Int].get assert(getValue == 12, "The returned value is not 12") } "Both Put messages" should "be processed and only the first one should update the content" in { buf ! Put(1) buf ! Put(2) whenStable { assert(buf.actorObject[BoundedBuffer].curSize == 1, "The current size is not one") assert(processingCount(put) == 2, "Put has not been processed twice") } } }
For specifying the methods that should be called before and after each test, you should override setUp and tearDown methods respectively.
Integration Testing
Consider an actor system with three actors: BoundedBuffer, Producer, and Consumer.
The code for Producer and consumer are shown in the follwoing:
class Consumer(buf: ActorRef) extends Actor { var token: Int = -1 def receive = { case Consume(count) ⇒ { for (i ← 1 to count) { token = (buf ? Get).get.asInstanceOf[Int] } } case GetToken ⇒ self.reply(token) } } class Producer(buf: ActorRef) extends Actor { def receive = { case Produce(values) ⇒ { values.foreach(v ⇒ buf ! Put(v)) } } }
The consumer accepts a Consume(count) message with a counter argumet and sends Get messages to the buffer with the specified counter.
Also consumer accepts a GetToken message and replies with the last received value from the buffer. The default value for the token field is
-1.
The producer accepts a Produce(list) message with a list of numbers as argument and sends Put message to the buffer for each value
in the list.
We want to test the following scenario which consists of two steps:
Get message from consumer and check if the
value received by consumer is -2 (empty buffer returns -2) and content of .Put(1), and one
message from the consumer, Get, with the order Put(1) -> Get and check if the
consumer receives 1 or not;
This test in the form of JUnit is shown in the following. In order to specify the order of messages in the test execution, the messages sent to buffer should be defined as TestMessageEnvelops.
Note that becuase consumer sends the Get message via a future, the sender of get message envelop is set to anyActorRef.
The API setSchedule(po1,po2,...,pon) accepts a set of partial orders, po1, po2, ..., pon, as argumets. Each partial order is specified by using -> between test message
envelops, e.g. setSchedule(tme1->tme2, tme3->tme4->tme5).
The rule for the partial orders is that the receiver of the test message envelops in each partial order should be the same (since actors do not share their local state, the order of messages sent to the same actor can only affect the results)
class TestBoundedBufferWithProducerConsumer extends SetakJUnit { @Test def testBufferWithProducerConsumer() { val buf = actorOf(new BoundedBuffer(1)).start val consumer = actorOf(new Consumer(buf)).start val producer = actorOf(new Producer(buf)).start val put1 = testMessageEnvelop(producer, buf, Put(1)) val get = testMessageEnvelop(anyActorRef, buf, Get) //step 1 consumer ! Consume(1) whenStable { assert((consumer ? GetToken).mapTo[Int].get == -2) } //step 2 setSchedule(put1 -> get) producer ! Produce(List(1)) consumer ! Consume(1) whenStable { assert((consumer ? GetToken).mapTo[Int].get == 1) } } }
New Feature: Support for Non-Stable Systems
There might be the case where the system cannot get stable, i.e. there are always some messages to be processed. We have added a feature to Setak to support those cases.
Two APIs, afterMessage(message) { body } and afterAllMessages { body } can be used for this puprose. afterMessage(message) accepts a
test message envelop as input and executes the body after message is processed.
afterAllMessages executes the body after all the test message envelops are processed in the system. Using this feature is not limited to testing non-stable
systems; if all the messages are known and defined as test message envelops, afterAllMessages
instead of whenStable is an alternate and more reliable way of checking assertions in the system.
The follwoing test case shows the usage of these two APIs:
class TestBoundedBufferWithProducerConsumer extends SetakJUnit { @Test def testBufferWithProducerConsumer() { val buf = actorOf(new BoundedBuffer(1)).start val consumer = actorOf(new Consumer(buf)).start val producer = actorOf(new Producer(buf)).start val put1 = testMessageEnvelop(producer, buf, Put(1)) val get1 = testMessageEnvelop(anyActorRef, buf, Get) val get2 = testMessageEnvelop(anyActorRef, buf, Get) //step 1 consumer ! Consume(1) afterMessage(get1) { assert((consumer ? GetToken).mapTo[Int].get == -2) } //step 2 setSchedule(put1 -> get2) producer ! Produce(List(1)) consumer ! Consume(1) afterAllMessages { assert((consumer ? GetToken).mapTo[Int].get == 1) } } }
In the above test in step 1, it waits for get1 to be processed and then executes the body (check the assertion). In step 2, it waits for all test messages,
i.e. put and get2, to be processed and then check the assertion.
In summary, Setak provides teh following features:
TestMessageEnvelop in the form of objects and patterns: Two factories testMessageEnvelop and testMessagePatternEnvelop can be used for this purpose.
The wildcards for sender and receiver (anyActorRef) and for message parameter (anyMessage) facilitate defining TestMessageEnvelops in the case that the exact values
are not known or do not matter.isProcessed, isDelivered,
deliveryCount, and processingCount can be used to access
the delivery and processing status of test message envelops.actorOf in the test class, the actors will be created by Setak factory (testActorRefFactory). This factory is
also passed to each TestActorRef. Therefeore, if you wan to create actors inside of the program and outside of the test class, then you need to cast the ActorRef to TestActorRef and use Setak factory
for creating the actors. An example of this case is shown in the following:
class ParentActor extends Actor { def receive = { case 'create ⇒ { val child = akka.actors.Actor.actorOf(new Actor { def receive = { case _ ⇒ } }) //.... } } }
In order to test this program, you need to change that to the following:class ParentActor extends Actor { def receive = { case 'create ⇒ { val child = self.asInstanceOf[TestActorRef].testActorRefFactory.actorOf(new Actor { def receive = { case _ ⇒ } }) //.... } } }
setSchedule can be used for specifying the order of test message envelops in the test execution
and hence controlling the non-determisim in the test execution.SetakFlatSpec or SetakWordSpec
are used, overriding setUp and tearDown provides this feature. For the case that SetakJUnit is used,
@Before and @After annotations can be used for this purpose.whenStable, afterMessage, and afterAllMessages methods: These
methods accept a block of instructions that are executed when the system reaches to stable state, a test message envelop
is processed, or after all test message envelops are processed respectively.
It allows the actors to process their messages before the test continues. This
feature solves a common problem that the users usually have while writing tests
for concurrent programs specially with aysynchronous events.akka.setak.TestConfig: (1) maxTryForStability: the maximum number of
times that the program should be checked for stability; (2) sleepInterval: the time intervals between each two checking for stability in milliseconds;
and (3) timeOutForMessages the waiting timeout for the messages to be processed. If the program didn't get stable after
the maximum number of tries or the message(s) is(are) not processed after the timeout, it will throw an exception.
The default values for maxTryForStability is 20, for sleepInterval is 100 milliseconds and for timeOutForMessages is
2000 milliseconds.
In order to change these default values you can change these variables in your test calss, e.g.
akka.setak.TestConfig.maxTryForStability = 10, akka.setak.TestConfig.sleepInterval = 10000, etcIn the "test" folder of the source code, you can find the examples and also test cases written for Setak. Both of them show the usage of Setak.