Thoughts, Tips and Tricks on what I'm currently do for a living. Currently most of my spare time is spent on contributing to Akka.NET.

Sunday, October 26, 2014

Integration Testing using Akka.NET's TestKit

In Getting Started with Akka.NET and Handling messages and state with Akka.NET we created a calculator in Akka.NET that was able to add and subtract numbers and return the answer. It also stores the last answer and can respond with it when asked.

In Unit Testing using Akka.NET’s TestKit we created some unit tests to verify internal state of the actor.

In this post we’ll switch to integration tests – verifying we get correct messages back.

Code for this post: https://github.com/HCanber/akka.net-blog-examples/tree/master/04-calculator-testkit-integration

All posts in this series: Tutorials for Akka.NET

Note! This post was written using Akka.NET 0.7.0 and might not work for later versions

Unit Testing Recap

Unit Testing using Akka.NET’s TestKit we created some unit tests to verify internal state of the actor. We used ActorOfAsTestActorRef<T>() which creates the actor in the test’s ActorSystem (which is exposed thru the property Sys inside tests) and returns a TestActorRef<T> which give us access to the underlycing actor instance using the property UnderlyingActor.

[Fact]
public void Answer_should_initially_be_0()
{           
    TestActorRef<CalculatorActor> calculatorRef = ActorOfAsTestActorRef<CalculatorActor>("calculator");
    CalculatorActor calculator = calculatorRef.UnderlyingActor;
    Assert.Equal(0, calculator.Answer);
}

[Fact]
public void After_adding_1_and_1_Answer_should_be_2()
{
    TestActorRef<CalculatorActor> calculatorRef = ActorOfAsTestActorRef<CalculatorActor>("calculator");
    calculatorRef.Tell(new Add(1,1));
    CalculatorActor calculator = calculatorRef.UnderlyingActor;
    Assert.Equal(2, calculator.Answer);
}

Remember that everything is synchronous when writing these kind of tests. It’s the use of ActorOfAsTestActorRef<T>() that makes it synchronous. A CallingThreadDispatcher is used for actors created using ActorOfAsTestActorRef<T>() so when we send it a message using Tell(message) it’s not dispatched on another thread, but instead immediately processed before Tell returns control back to our test.

Integration tests

When writing integration tests, we create and run the actor the way we normally do. This means multi threaded under full concurrency (actors are shielded by the Actor model – the actor only processes on message at a time).

The first test: GetLastAnswer should initially respond with Answer(0)

In our first unit test we verfied that the internal state answer is 0 initially. We can also verify this by sending the actor a GetLastAnswer message and verify that we get an Answer(0) back.

The skeleton for the test looks like this:

public class CalculatorIntegrationTests : TestKit
{
    [Fact]
    public void Answer_should_initially_be_0()
    {
        var calculator = ActorOf<CalculatorActor>("calculator");
        calculator.Tell(GetLastAnswer.Instance);

        //Somehow verify we get an Anser(0) back            
    } 
}

So how can we verify that the calculator responds with what we expect? We could use Ask but there is a better way using TestKit.

TestActor

When the TestKit’s ActorSystem is created it also creates a special actor called TestActor. This instance is used as an implicit sender, so when we sent a message to calculator like this:

calculator.Tell(GetLastAnswer.Instance);

It was actually sent as if we’d specified TestActor as the sender:

calculator.Tell(GetLastAnswer.Instance, TestActor);

So when calculator send the response it will send it to TestActor which in turn will put the message in a queue that we can test against.

Created with Raphaël 2.1.0TestTestCalculatorCalculatorTestActorTestActorqueuequeueGetLastAnswer, Sender=TestActorAnswer(0)Enqueues Answer(0)

To test that the queue contains the correct message, or another way of seeing it: that TestActor received the correct message we use ExpectMessage<T>() and then assert that the value is correct.

[Fact]
public void Answer_should_initially_be_0()
{
    var calculator = ActorOf<CalculatorActor>("calculator");
    calculator.Tell(GetLastAnswer.Instance);

    var answer = ExpectMsg<Answer>();
    Assert.Equal(0, answer.Value);
}

The last two lines can also be written like this:

ExpectMsg<Answer>(a => a.Value == 0);

Isn’t it asynchronous?

We send a message to calculator which will process the message on another thread. At the same time we test that we have received a response. How can we be sure that the calculator has responded before we execute ExpectMessage<Answer>()? Don’t we need to synchronize somehow? Or is ir running synchronously as with unit tests?

No, it really is asynchronous, BUT ExpectMsg will wait up to 3 seconds before it fails.

Verifying Add

Rewriting the second unit test as a integration test is really easy now that we have all building blocks.

    [Fact]
    public void After_adding_1_and_1_Answer_should_be_2()
    {
        var calculator = ActorOf<CalculatorActor>("calculator");

        calculator.Tell(new Add(1, 1));
        var answer = ExpectMsg<Answer>();
        Assert.Equal(2, answer.Value);
    }

Be aware that the tests run in a full ActorSystem and everything is asynchronous even though it might look like synchronous code.

Code

Code on GitHub: https://github.com/HCanber/akka.net-blog-examples/tree/master/04-calculator-testkit-integration

All posts in this series: Tutorials for Akka.NET

Unit Testing using Akka.NET's TestKit

In Getting Started with Akka.NET and Handling messages and state with Akka.NET we created a calculator in Akka.NET that was able to add and subtract numbers and return the answer. It also stores the last answer and can respond with it when asked.

In this post we’ll introduce TestKit which will enable us to write unit and integration tests.

Code for this post: https://github.com/HCanber/akka.net-blog-examples/tree/master/03-calculator-testkit-unit

All posts in this series: Tutorials for Akka.NET

Note! This post was written using Akka.NET 0.7.0 and might not work for later versions

TestKit

TestKit is a module that allows us to write two types of tests:

  • Testing isolated pieces of code without involving the actor model, meaning without multiple threads; this implies completely deterministic behavior concerning the ordering of events and no concurrency concerns and will be called Unit Testing in the following.
  • Testing (multiple) encapsulated actors including multi-threaded scheduling; this implies non-deterministic order of events but shielding from concurrency concerns by the actor model and will be called Integration Testing in the following.

In Akka.NET TestKit is distributed in two nuget packages. First you need Akka.TestKit which contains all base functionality and then you need one targeted for a specific Test framework. At the time of writing, we have nuget packages for two frameworks: xUnit and VSTest/MSTest: Akka.TestKit.Xunit and Akka.TestKit.VsTest

Add a test project

We’ll continue with the code from Handling messages and state with Akka.NET which can be found on GitHub by adding a xUnit test project. The result can also be found on GitHub.

Start by adding a new Class Library project to the solution. We’ll call it Calculator.Tests. Delete Class1.cs from the project.

Add Akka.TestKit.XUnit to the project

Add the package Akka.TestKit.XUnit to the test project

Install-Package Akka.TestKit.Xunit

This will install all packages needed (Akka, Akka.TestKit, xunit).
Add a reference to the console app project.

Create the first test class

Create a new class, inherit from TestKit

using Akka.TestKit.Xunit;
using CalculatorApp;
using Xunit;

namespace Calculator.Tests
{
    public class CalculatorUnitTests : TestKit
    {
    }
}

Now we’re good to go!

TestKit’s test system

When writing tests using TestKit (by inheriting from TestKit) it creates an ActorSystem for us in which the tests are run, so you do not need to create one yourself.

The system can be accessed using Sys. This means that to create an actor in this system you’d do:

var calculator = Sys.ActorOf<CalculatorActor>("calculator");

Since this is done so frequently in tests you can skip the Sys property:

var calculator = ActorOf<CalculatorActor>("calculator");

Unit testing actors using TestActorRef

Unit testing actors is when we have full control over the order messages are processed, no concurrency and everything is synchronous. TestKit also provides a way to actually verify internal state of an actor.

Normally, when we create an actor like this we don’t get access to it’s internal state since the proxy ActorRef is returned:

ActorRef calculator = system.ActorOf<CalculatorActor>("calculator");

When using TestKit there is an overload that returns a TestActorRef<TActor> which gives as access to the underlying actor instance via the property UnderlyingActor.

The first test: Internal state answer should be 0 initially

Right after a calculator has been created it’s internal stored answer should be 0. To verify this is true, we need to make some changes to CalculatorActor so we can access the answer value. The local variable answer is turned into a private field, and we add an Answer property:

public class CalculatorActor : ReceiveActor
{
    private double _answer;

    public CalculatorActor()
    {
        Receive<Add>(add =>
        {
            _answer = add.Term1 + add.Term2;
            Sender.Tell(new Answer(_answer));
        });

        Receive<Subtract>(sub =>
        {
            _answer = sub.Term1 - sub.Term2;
            Sender.Tell(new Answer(_answer));
        });

        Receive<GetLastAnswer>(m => Sender.Tell(new Answer(_answer)));
    }

    public double Answer { get { return _answer; } }
}

Now we can write the test:

[Fact]
public void Answer_should_initially_be_0()
{           
    TestActorRef<CalculatorActor> calculatorRef = ActorOfAsTestActorRef<CalculatorActor>("calculator");
    CalculatorActor calculator = calculatorRef.UnderlyingActor;
    Assert.Equal(0, calculator.Answer);
}

Instead of ActorOf<CalculatorActor> we use ActorOfAsTestActorRef<CalculatorActor> which returns TestActorRef<CalculatorActor>. This type exposes the underlying actor in the UnderlyingActor property.
Once we got hold of it we verify that the Answer is 0.

If we run this test it passes.

Verifying that state is modified after handling messages

The CalculatorActor should always store the latest answer. So of we send it Add(1,1) the Answer property should be 2.

[Fact]
public void After_adding_1_and_1_Answer_should_be_2()
{
    TestActorRef<CalculatorActor> calculatorRef = ActorOfAsTestActorRef<CalculatorActor>("calculator");
    calculatorRef.Tell(new Add(1,1));
    CalculatorActor calculator = calculatorRef.UnderlyingActor;
    Assert.Equal(2, calculator.Answer);
}

Remember that everything is synchronous when writing these kind of tests. It’s the use of ActorOfAsTestActorRef<T>() that makes it synchronous. A CallingThreadDispatcher is used for actors created using ActorOfAsTestActorRef<T>() so when we send it a message using Tell(message) it’s not dispatch on another thread, but immediately processed before Tell returns control back to our test.

Code

Code on GitHub: https://github.com/HCanber/akka.net-blog-examples/tree/master/03-calculator-testkit-unit

All posts in this series: Tutorials for Akka.NET

Handling messages and state with Akka.NET

In Getting Started with Akka.NET we created a calculator in Akka.NET that was able to add two numbers and return the response.

In this post we’ll add more functionality to the calculator.

Code for this post: https://github.com/HCanber/akka.net-blog-examples/tree/master/02-calculator

All posts in this series: Tutorials for Akka.NET

Note! This post was written using Akka.NET 0.7.0 and might not work for later versions

Receiving more messages

The calculator should be able to subtract two values as well, so create a copy of the Add class and rename it to Subtract

public class Subtract
{
    private readonly double _term1;
    private readonly double _term2;

    public Subtract(double term1, double term2)
    {
        _term1 = term1;
        _term2 = term2;
    }

    public double Term1 { get { return _term1; } }
    public double Term2 { get { return _term2; } }
}

We make the fields readonly to ensure the message is immutable.

Next, we need to tell CalculatorActor how to handle Subtract messages by adding another Receive statement:

public class CalculatorActor : ReceiveActor
{
    public CalculatorActor()
    {
        Receive<Add>(add => Sender.Tell(new Answer(add.Term1 + add.Term2)));
        Receive<Subtract>(sub=> Sender.Tell(new Answer(sub.Term1 - sub.Term2)));
    }
}

You can have as many Receive statements you like, but be aware of that the order is important. The upper takes precedence over the lower. In this case it doesn’t matter, but if we were to insert a Receive<object> first, it would handle ALL messages, and our handlers for Add and Subtract would never be called.

Testing subtraction

We add two more lines to test subtraction:

static void Main(string[] args)
{
    var system = ActorSystem.Create("calculator-system");
    var calculator = system.ActorOf<CalculatorActor>("calculator");
    var answer = calculator.Ask<Answer>(new Add(1, 2)).Result;
    Console.WriteLine("1 + 2 = " + answer.Value);

    var answerSubtract = calculator.Ask<Answer>(new Subtract(5, 3)).Result;
    Console.WriteLine("5 - 3 = " + answerSubtract.Value);

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}

Adding state

Let’s add some state to our CalculatorActor. Every time it has performed a calculation it should store the answer. We modify the actor to look like this.

    public class CalculatorActor : ReceiveActor
    {
        public CalculatorActor()
        {
            var answer = 0d;

            Receive<Add>(add =>
            {
                answer = add.Term1 + add.Term2;
                Sender.Tell(new Answer(answer));
            });

            Receive<Subtract>(sub =>
            {
                answer = sub.Term1 - sub.Term2;
                Sender.Tell(new Answer(answer));
            });
        }
    }

Notice that we do not need to declare any fields, we can use locally declared variables.

Concurrency

Suppose we send Add(1,1) and Subtract(555,22) to the actor. The actor start processing Add(1, 1) assign answer=2 and then at the same time Subtract(555,111) calculates and assigns answer=444. When the handler of Add(1,1) creates the reply it will be Answer(444) which is clearly wrong.

How can we make sure this will not happen?

You don’t need to do anything. Akka.Net handles this for you. An actor can only process one message at the time. So if it has started processing Add(1,1) it will not start processing anything else until it has finished processing Add(1,1).

Retrieve state

So how can we get the latest answer from CalculatorActor now that answer is stored in a local variable inside the constructor? Can we make it a public field and then use that to access the value?

No, that will not help! Remember this line when we created the actor:

ActorRef calculator = system.ActorOf<CalculatorActor>("calculator");

ActorOf returns an ActorRef, not a CalculatorActor so we do not have access to public members on CalculatorActor. And that’s a good thing, as doing so would open up for sharing state.

Returning to one of the fundamentals of Actor based systems:

The only way to communicate with an actor is by messages.

So we need to send it a message GetLastAnswer and let it reply with an Answer message.

First the new message. Since it doesn’t store any values, we create it as a singleton:

public class GetLastAnswer
{
    private static readonly GetLastAnswer _instance=new GetLastAnswer();
    private GetLastAnswer() {}
    public static GetLastAnswer Instance { get { return _instance; } }
}

Next we declare what CalculatorActor should do when it receives a GetLastAnswer

Receive<GetLastAnswer>(m => Sender.Tell(new Answer(answer)));

And then to test it, from Main() we ask for the last answer after the subtraction:

var answerSubtract = calculator.Ask<Answer>(new Subtract(5, 3)).Result;
Console.WriteLine("5 - 3 = " + answerSubtract.Value);

var lastAnswer = calculator.Ask<Answer>(GetLastAnswer.Instance).Result;
Console.WriteLine("Last answer = " + lastAnswer.Value);

If you run this you should see:

1 + 2 = 3
5 - 3 = 2
Last answer = 2

Code

One file containing all code:
https://github.com/HCanber/akka.net-blog-examples/blob/master/02-calculator/CalculatorApp/Program.cs

Entire project: https://github.com/HCanber/akka.net-blog-examples/tree/master/02-calculator

All posts in this series: Tutorials for Akka.NET

Getting Started with Akka.NET

Akka.NET is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant event-driven applications on .NET and Mono. Instead of sharing state and managing concurrency ourselves Akka.NET handles this for us. We write the code in Actors, which has private state, runs in a single thread and interact with the outside via messages.

More information on Akka.NET can be found at http://akkadotnet.github.io/

Code for this post: https://github.com/HCanber/akka.net-blog-examples/tree/master/01-calculator
All posts in this series: Tutorials for Akka.NET

Note! This post explains how to create a simple calculator in Akka.NET. It was written when Akka.NET was in version 0.7.0 and might not work for later versions

Create the project

Start by creating a new Console App in Visual Studio.
Install the nuget package Akka

PM> Install-Package Akka

This is all that’s needed to start writing code for Akka.NET.

Create message classes

The only way to communicate with an actor is by messages (simple poco classes) .
In this example we’ll send an Add message containing two integers to the calculator and we’ll get the answer as an Answer message.

Created with Raphaël 2.1.0ProgramProgramCalculatorCalculatorAdd(1, 2)Calculates 1+2Answer(3)

We define two classes to be our messages.

public class Add
{
    private readonly double _term1;
    private readonly double _term2;

    public Add(double term1, double term2)
    {
        _term1 = term1;
        _term2 = term2;
    }

    public double Term1 { get { return _term1; } }
    public double Term2 { get { return _term2; } }
}

public class Answer
{
    private readonly double _value;

    public Answer(double value)
    {
        _value = value;
    }

    public double Value { get { return _value; } }
}
Why not Auto properties?

All messages must be immutable, otherwise we’ve open up for sharing state. Readonly fields ensures the messages are immutable.

The Calculator actor

Next is the CalculatorActor. We’ll start off with an empty class, and inherit from ReceiveActor:

using Akka.Actor;

public class CalculatorActor : ReceiveActor
{
    public CalculatorActor()
    {       
    }
}

That’s all we need for the CalculatorActor for now. We’ll come back to it later.

Create the ActorSystem

Actors are run in an ActorSystem and we need to create one.

using Akka.Actor;

namespace CalculatorApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var system = ActorSystem.Create("calculator-system");
        }
    }
}

Create a CalculatorActor instance

Now that we have a system, we can create a new instance of CalculatorActor using system.ActorOf.

var system = ActorSystem.Create("calculator-system");
ActorRef calculator = system.ActorOf<CalculatorActor>("calculator");

In this example we’ve also given it the name "calculator". This is optional, however since every actor must have a name, if you don’t give it a name it will be given a generated name. A good tip is to always name your actors . It will help when debugging, and when reading logs.

Send a message to the calculator

As you see above, ActorOf returns an ActorRef which acts as a proxy for the actual actor instance. This will allow us to communicate with the actor, without giving us access to any members exposed by the actor class.
The next step is to send it an Add message.

ActorRef calculator = system.ActorOf<CalculatorActor>("calculator");
calculator.Tell(new Add(1, 2));

This will asynchronously send the Add message to the calculator actor. But we haven’t defined how the actor should deal with messages so let’s do that.

Handling messages

The ReceiveActor allows us to declare what will happen when different messages are received, and we do that in the constructor:

public class CalculatorActor : ReceiveActor
{
    public CalculatorActor()
    {
        Receive<Add>(add => Sender.Tell(new Answer(add.Term1 + add.Term2)));
    }
}

The Receive statement says: When the actor receives a message of type Add respond to the sender with an Answer message.

Receiving responses

Next step is to receive the Answer and print it to the console.

One way we to interact with actors from the outside and to receive responses is to use Ask instead of Tell. It will return a Task on which we can await until a response is received.

So we switch to Ask, await an Answer (using the Result property which will block the current thread) and then print the result.

static void Main(string[] args)
{
    var system = ActorSystem.Create("calculator-system");
    var calculator = system.ActorOf<CalculatorActor>("calculator");
    var answer = calculator.Ask<Answer>(new Add(1, 2)).Result;
    Console.WriteLine("Answer: "+answer.Value);

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}

That’s it. Go ahead and run it. You should see:

Answer: 3
Press any key to exit

A note on Ask

While Ask is very useful when interacting with actors from the outside be aware of that there are performance implications of using Ask compared to using Tell. So always prefer Tell for performance, and only Ask if you must.

Code

One file containing all code:
https://github.com/HCanber/akka.net-blog-examples/blob/master/01-calculator/CalculatorApp/Program.cs

Entire project: https://github.com/HCanber/akka.net-blog-examples/tree/master/01-calculator

Next

Next: Handling messages and state
All posts in this series: Tutorials for Akka.NET

Friday, January 10, 2014

Building Kafka 0.8.0 from sources on Windows

Prerequisites – Java JDK

Although Kafka is written in Scala, you do not need to install it. Only Java JDK is required.

Download Java JDK:
http://www.oracle.com/technetwork/java/javase/downloads/index.html

Install it and make sure you update the PATH environment variable according to instructions:
http://docs.oracle.com/javase/7/docs/webnotes/install/windows/jdk-installation-windows.html

Get the source

Do either the Clone-and-Checkout step or the Download step:

Build

Execute:

.\sbt.bat update
.\sbt.bat package
.\sbt.bat assembly-package-dependency

Update the bat files

Unfortunately the bat files for Kafka 0.8.0 are full of errors, so in order to start Zookeeper and Kafka they must be replaced. See Step 1, Update the bat files.

Update config for Windows

Update the config files according to Step 1, Update Config.

Run

To run, see Step 2, Start the server.

 

See Running Kafka 0.8.0 on Windows for more info on how to set up and run Kafka using the binary distribution.

Kafka 0.8.0 on Windows

Getting Kafka 0.8.0 running on Windows isn’t straight forward if you follow the instructions. They are somewhat misleading, and the bat files are old. But with correct instructions and updated bat files it’s easy and can be done under 10 minutes. Some say you need Cygwin in order to run Kafka. This is not true. Only Server JRE is required.

Step 0. Prerequisite – Java SE Server JRE

You need Java SE Server JRE in order to run Kafka. If you have JDK installed, you already have Server JRE installed.

  1. Download Java SE Server JRE
    http://www.oracle.com/technetwork/java/javase/downloads/index.html
    For me Chrome changed the extension. If that happens change it back to .tar.gz in order to unpack it.
  2. Unpack it to a folder, for example c:\JreServer
    Update the system environment variable PATH to include C:\JreServer\jre\bin (Control Panel and search for environment variable).

Step 1. Download Kafka

  1. Download the binaries for Kafka from http://kafka.apache.org/downloads.html
  2. Unzip to a folder, for example c:\kafka
Update the bat files

Unfortunately the bat files for Kafka 0.8.0 are full of errors, so in order to start Zookeeper and Kafka they must be replaced.

  1. Download updated windows bat files from https://github.com/HCanber/kafka/releases
  2. Unzip and copy them into c:\kafka\bin\windows (overwrite the files already there)
Update Config

The config files need to be updated.

  1. Open config\server.properties and locate log.dirs=/tmp/kafka-logs. If you keep the default it will result in an error later on. Set it to a full path without . or .. in it and with forward slashes. Example:
    log.dirs=c:/kafka/kafka-logs
  2. This step is optional but you might want to set the data directory for Zookeeper as well. Open config\zookeeper.properties and locate dataDir=/tmp/zookeeper. Example:
    dataDir=c:/kafka/zookeeper-data

Step 2. Start the server

Note! Before executing the commands below you need to change directory to the folder where you unzipped Kafka:

cd c:\kafka
  1. Start Zookeeper in a Command Prompt or PowerShell Console.
    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  2. Start Kafka in another Command Prompt or PowerShell Console.
    .\bin\windows\kafka-server-start.bat .\config\server.properties

Step 3. Create a topic

  1. Create a topic.
    .\bin\windows\kafka-create-topic.bat --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
  2. List topics.
    .\bin\windows\kafka-list-topic.bat --zookeeper localhost:2181

Step 4. Send some messages

  1. Start Console Producer
    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
  2. Write some messages
    This is a message 
    This is another message

Step 5. Start a consumer

  1. Start Console Consumer
    .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

 

Building from sources

See Building Kafka 0.8.0 from sources on Windows