onsdag 7 april 2010

Actors in java with akka, cont persistence

Ok, maybe we want our messages to be stored in a database in case something goes wrong?
I will use redis, so you will need to setup a local redis server. this is very simple read more on redis homepage: http://code.google.com/p/redis/
if you prefer cassandra or any other nosql database akka supports (check http://doc.akkasource.org/persistence ) you simply have to change RedisStorage to your storage solution.

Adding storage is usually allot of work. or at least some work. in akka, it is trivial:

in your maven pom.xml add:
<dependency>
<groupid>se.scalablesolutions.akka</groupId>
<artifactid>akka-persistence-redis_2.8.0.Beta1</artifactId>
<version>0.8</version>
</dependency>
this will include the dependencies we need to store everything.

in Chatserver.java


@transactionrequired
public class ChatServer {
    @Inject
    public ChatServer() {
    }
    private List<Message> messages = new ArrayList<Message>();
    private Set<ChatSession> sessions = new HashSet<ChatSession>();
    private PersistentVector<byte[]> storage;

    @inittransactionalstate
    public void init() {
        storage = RedisStorage.getVector("Storage");
    }


this is what changed:
i added an @transactionrequired annotation to the class, this means that all methods in ChatServer will be done in a stm transaction. you can read more about that in the akka docs: http://doc.akkasource.org/transactors

Then i changed the Vector to a PersistentVector. this is a datatype provided by akka, that stores its content in a database. There are several databases supported, Cassandra and redis are two examples. they all work with the same Peristent Datastructures, in addition to Vector there are for ex: PersistentMap and PersistentQueue.

I initiate this Vector in a method with the @inittransactionalstate annotation, this method should be called by magic by akka, but its currently not. so until a very near future when that is fixed, i decided to call that method in the login method in ChatSession.

Thats it, our messages are now stored in a redis database. if you installed redis with default settings, and use the default config for akka, it will find the database without any configuration.

Actors in java with akka, cont

I continue where my last post left us.
We now have a fully working chat, we can send messages and we will recieve messages that other users send.
The system is far from perfect. Lets try to improve it.

When i send a message, i have to wait until the server has sent it out to all the users before i can send another message, or look at the log. We could just wrap it in a Runnable and use a executor to make the call async. But that is pretty uggly code, and it can soon start to lead to problems.
Instead, lets use akka to introduce actors. if you want to know why and what actors are, there are alot of reading on the web. you can start with akka's homepage for ex: www.akkasource.org

first, we need to add the akka repository and dependency to our maven pom.

   <repositories>
        <repository>
            <id>akka.repository</id>
            <name>Akka Maven Repository</name>
            <url>http://scalablesolutions.se/akka/repository</url>
        </repository>
    </repositories>
and
        <dependency>
            <groupId>se.scalablesolutions.akka</groupId>
            <artifactId>akka-core_2.8.0.Beta1</artifactId>
            <version>0.8</version>
        </dependency>


now lets change some code, remember the testcase we had? with the Guice Injector, lets rewrite that.


public class IntegrationTest {

    ActiveObjectConfigurator conf = new ActiveObjectConfigurator();

    @Before
    public void setup() {
        MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test");
        conf.addExternalGuiceModule(new AbstractModule() {

            @Override
            protected void configure() {
                bind(ChatSession.class);
            }
        }).configure(new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}),
                new Component[]{
                    new Component(ChatServer.class, new LifeCycle(new Permanent()), 10000, dispatcher)}).inject().supervise();
    }

    @Test
    public void integrationTest() throws InterruptedException {
        final ChatSession userOne = conf.getExternalDependency(ChatSession.class);
        userOne.setUserName("userOne");
        userOne.login();
        final ChatSession userTwo = conf.getExternalDependency(ChatSession.class);
        final ChatSession userThree = conf.getExternalDependency(ChatSession.class);
        userTwo.setUserName("userTwo");
        userTwo.login();
        Thread.sleep(100); //to ensure login is done
        userOne.sendMessage("Hello");
        userTwo.sendMessage("Another Message 1");
        assertEquals(2, userOne.getLog().size());
        assertEquals(2, userTwo.getLog().size());
        assertEquals(userTwo.getLog(), userOne.getLog());

    }
}


As you can see, not much have changed.
We still have our guiceModule where we configure ChatSession.
But now we have a ActiveObjectConfigurator, we could call this an extension to guice. For now, ignore all the Strange things like RestartStrategy and LifeCycles we will look at them later.
lets look at the integrationTest method, the only difference here is that instead of creating a GuiceInjector, we use the ActiveObjectConfigurator, and with getExternalDependency, we can get an instance of ChatSession. But now, every method is fire and forget. ie, after we call the method we will return immediatly.

Now we have a much more scalable solution. next, it is time to store our messages in a database.

Actors in java with akka

I am currently working my way through the akka tutorial, except that im using java instead of scala. I am also trying to make it in smaller steps, and trying to keep it runnable as much as possible. I have removed import statements and such to make it shorter, the full code can be found at github: http://github.com/bobo/akka_sample_java

Lets start withough any akka or other magic, except for Guice.
I started by creating a chatserver:


public class ChatServer {

    @Inject
    public ChatServer() {
    }

    private List<Message> messages = new ArrayList<Message>();
    private Set<ChatSession> sessions = new HashSet<ChatSession>();
    private Vector<byte[]> storage = new Vector<byte[]>();

    public void login(ChatSession chatSession) {
        sessions.add(chatSession);
    }

    public List<Message> getLog() {
        return Collections.unmodifiableList(messages);
    }

    public void sendMessage(Message message) {
        storeMessage(message);
        for (ChatSession chatSession : sessions) {
            chatSession.recieveMessage(message);
        }
    }

    private void storeMessage(Message message) {
        messages.add(message);
        storage.add(message.toString().getBytes());
    }
}

This is just a simple class, that stores messages in byteformat in a vector.
It might seem weird to have the Vector store byte[] instead of message, but later we will change it to a PersistentVector, and that uses byte[] to store it in the backend.

And we need some clients:



public class ChatSession {

    private String userName;
    private final List<Message> messages = new ArrayList<Message>();
    @Inject
    private ChatServer chatServer;

    public void login() {
        chatServer.login(this);
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getUserName() {
        return userName;
    }

    public void recieveMessage(Message message) {
        messages.add(message);
    }

    public void sendMessage(String text) {
        Message message = new Message(userName, text);
        chatServer.sendMessage(message);
    }

    public List<Message> getLog() {
        return messages;
    }
}


Here we inject the Chatserver with @Inject directly on the variable declaration

we can use it in a simple testcase:



public class IntegrationTest {
    Module m;
    @Before
    public void setup() {
        m = new AbstractModule() {
            @Override
            protected void configure() {
                bind(ChatSession.class);
                bind(ChatServer.class).asEagerSingleton();
            }
        };
    }

    @Test
    public void integrationTest() throws InterruptedException {
        Injector inj = Guice.createInjector(m);
        final ChatSession userOne = inj.getInstance(ChatSession.class);
        userOne.setUserName("userOne");
        userOne.login();
        final ChatSession userTwo = inj.getInstance(ChatSession.class);
        userTwo.setUserName("userTwo");
        userTwo.login();
        userOne.sendMessage("Hello");
        userTwo.sendMessage("Another Message 1");
        Thread.sleep(400); //wait for messages to finish
        assertEquals(2, userOne.getLog().size());
        assertEquals(2, userTwo.getLog().size());
        assertEquals(userTwo.getLog(), userOne.getLog());
    }
}


Here we create two chatsessions with our guice injector. And since we have bound ChatServer as a singleton they will have the same instance of chatserver injected.


Now the next step is to add akka to the project. i will add that to a seperate post