OneForOneStrategy监控策略&Actor测试

示例代码:

package com.zte.sunquan.demo.actor.ofo;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.japi.Function;
import scala.concurrent.duration.Duration;

import static akka.actor.SupervisorStrategy.escalate;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.stop;


public class SupervisorActor extends AbstractActor {

    public ActorRef childActor;//子actor,异常在子actor抛出

    public SupervisorActor() {
        childActor = getContext().actorOf(WorkerActor.props(),
                "workerActor");
    }

    //这里我们对重启的频率作了限制,最多5秒内能进行 3 次重启
    private static SupervisorStrategy strategy = new OneForOneStrategy(3,
            Duration.create("5 second"), new Function<Throwable, Directive>() {
        public Directive apply(Throwable t) {
            if (t instanceof ArithmeticException) {
                return resume();
            } else if (t instanceof NullPointerException) {
                return restart();
            } else if (t instanceof IllegalArgumentException) {
                return stop();
            } else {
                return escalate();
            }
        }
    });

    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(Result.class, p -> {
            childActor.tell(p, getSender());
        }).matchAny(p -> {
            childActor.tell(p, ActorRef.noSender());
        })
                .build();
    }
}

package com.zte.sunquan.demo.actor.ofo;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class WorkerActor extends AbstractActor {
    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private int state = 0;

    public static Props props() {
        return Props.create(WorkerActor.class);
    }

    @Override
    public void preStart() {
        log.info("Starting WorkerActor instance hashcode # {}", this.hashCode());
    }

    @Override
    public void postStop() {
        log.info("Stopping WorkerActor instance hashcode # {}", this.hashCode());

    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(String.class, p -> {
            throw new NullPointerException("Null Value Passed");
        }).match(Integer.class, p -> {
            if (p <= 0) {
                throw new ArithmeticException("Number equal or less than zero");
            } else
                state = p;
        }).match(Result.class, p -> {
            getSender().tell(state, ActorRef.noSender());
        }).matchAny(p -> {
            throw new IllegalArgumentException("Wrong Argument");
        })
                .build();
    }
}

package com.zte.sunquan.demo.actor.ofo;

public class Result {

}

package com.zte.sunquan.demo.actor.ofo;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import akka.testkit.TestKit;
import akka.testkit.TestProbe;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;

import java.util.concurrent.TimeUnit;

public class SupervisorTest extends TestKit {
    static ActorSystem _system = ActorSystem.create("faultTolerance");
    TestActorRef<SupervisorActor> supervisor = TestActorRef
            .apply(Props.create(SupervisorActor.class), _system);

    public SupervisorTest() {
        super(_system);
        supervisor.tell(Integer.valueOf(8), ActorRef.noSender());
    }

    @Test
    public void successTest() throws Exception {
        //正数
        supervisor.tell(Integer.valueOf(8), ActorRef.noSender());
        Integer result = (Integer) Await.result(
                Patterns.ask(supervisor, new Result(), 5000),
                Duration.create(5000, TimeUnit.MILLISECONDS));
        assert result.equals(Integer.valueOf(8));
    }

    @Test
    public void resumeTest() throws Exception {
        TestActorRef<SupervisorActor> supervisor = TestActorRef.apply(
                Props.create(SupervisorActor.class), _system);
        //first send a correct message
        supervisor.tell(Integer.valueOf(8), ActorRef.noSender());
        //Send a  message that generates exception  ArithmeticException
        supervisor.tell(Integer.valueOf(-8), ActorRef.noSender());

        Integer result = (Integer) Await.result(
                Patterns.ask(supervisor, new Result(), 5000),
                Duration.create(5000, TimeUnit.MILLISECONDS));

        assert result.equals(Integer.valueOf(8));
    }

    @Test
    public void restartTest() throws Exception {
        supervisor.tell("null", ActorRef.noSender());

        Integer result = (Integer) Await.result(
                Patterns.ask(supervisor, new Result(), 5000),
                Duration.create(5000, TimeUnit.MILLISECONDS));

        assert result.equals(Integer.valueOf(0));
    }

    @Test
    public void restartTestInMaxRetry() throws Exception {
        supervisor.tell("null", ActorRef.noSender());

        Integer result = (Integer) Await.result(
                Patterns.ask(supervisor, new Result(), 5000),
                Duration.create(5000, TimeUnit.MILLISECONDS));

        assert result.equals(Integer.valueOf(0));

        //由oneforone的创建参数,5s内只允许3次重启
        supervisor.tell("null", ActorRef.noSender());
        Thread.sleep(2000);
        supervisor.tell("null", ActorRef.noSender());
        supervisor.tell("null", ActorRef.noSender());

        ActorRef workerActor = supervisor.underlyingActor().childActor;
        TestProbe probe = new TestProbe(_system);
        probe.watch(workerActor);
        probe.expectMsgClass(Terminated.class);
        //
        Thread.sleep(5000);
        assert supervisor.underlyingActor().childActor.isTerminated();
    }


    @Test
    public void stopTest() throws Exception {

        ActorRef workerActor = supervisor.underlyingActor().childActor;
        TestProbe probe = new TestProbe(_system);
        probe.watch(workerActor);

        supervisor.tell(Long.parseLong("10"), ActorRef.noSender());

        probe.expectMsgClass(Terminated.class);
    }
}


从上面的测试用例中,对于Actor的测试,使用了

TestActorRef:用于获取Actor对象,从而能够获取actor中的属性,使用方法underlyingActor

TestProb:用于判断actor接收消息expectMsg


示例参考:

package com.zte.sunquan.pipe;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.TestActor;
import akka.testkit.TestActorRef;
import akka.testkit.TestProbe;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/**
 * Created by sunquan on 2017/12/19.
 *    Akka-testkit 的主要工具包括,
 *    1) testProbe 用于测试 Actor 回应和发送消息,testActor 用于简便情况下测试 Actor 回应消息,
 *    2) testActorRef 用于测试 Actor 内部状态的改变。
 */
public class PipeActorTest {
    private static ActorSystem system;

    @BeforeClass
    public static void setUpClass() throws Exception {
        System.setProperty("shard.persistent", "false");
        system = ActorSystem.create("test");
    }
    @Test
    public void testActor1() throws InterruptedException {
        //回应消息的测试
        TestProbe testProbe=new TestProbe(system);
        ActorRef actorRef = system.actorOf(FirstActor.props(), "sunquan");
        testProbe.send(actorRef,"sunquan");
        testProbe.expectMsg("success");
        Thread.sleep(5000);
    }
    @Test
    public void testActor2() throws InterruptedException {
        TestActorRef<FirstActor> actorRef = TestActorRef
                .create(system, FirstActor.props(), "sunquan");
        actorRef.tell("sunquan", ActorRef.noSender());
        //actor内部状态的测试
        Assert.assertEquals(actorRef.underlyingActor().getName(), "sunquan");
        Thread.sleep(5000);
    }

}


用到的POM

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.4.18</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-slf4j_2.11</artifactId>
            <version>2.4.18</version>
        </dependency>
相关文章
相关标签/搜索