如何使用多个WCF服务实例强制实施消息队列序列

我想创建一个使用MSMQ绑定的WCF服务,因为我有大量的服务要处理的通知.重要的是客户端不会被服务阻止,并且通知按照引发的顺序进行处理,从而实现队列实现.

另一个考虑因素是弹性.我知道我可以集群MSMQ本身使队列更健壮,但我希望能够在不同的服务器上运行我的服务实例,所以如果服务器崩溃通知不会在队列中建立,而另一台服务器继续处理.

我已经尝试了MSMQ绑定,并发现您可以让多个服务实例在同一个队列上进行侦听,并且自己离开时最终会进行一种循环,其中负载分布在可用服务上.这很好,但我最终失去了队列的顺序,因为不同的实例需要不同的时间来处理请求.

我一直在使用一个简单的控制台应用程序进行实验,这是下面的史诗代码转储.当它运行时,我得到这样的输出:

host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed

我想要发生的是:

host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.

我原以为,由于S2还没有完成,它可能仍会失败,并将正在处理的消息返回给队列.因此,不应允许S1从队列中拉出另一条消息.我的队列是事务性的,我已尝试在服务上设置TransactionScopeRequired = true但无济于事.

这有可能吗?我是以错误的方式去做的吗?是否有其他方法可以在没有某种中央同步机制的情况下构建故障转移服务?

class WcfMsmqProgram
{
    private const string QueueName = "testq1";

    static void Main()
    {
        // Create a transactional queue
        string qPath = ".\\private$\\" + QueueName;
        if (!MessageQueue.Exists(qPath))
            MessageQueue.Create(qPath, true);
        else
            new MessageQueue(qPath).Purge();

        // S1 processes as fast as it can
        IService s1 = new ServiceImpl("S1");
        // S2 is slow
        IService s2 = new ServiceImpl("S2", 2000);

        // MSMQ binding
        NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);

        // Host S1
        ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
        ConfigureService(host1, binding);
        host1.Open();
        Console.WriteLine("host1 open");

        // Host S2
        ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
        ConfigureService(host2, binding);
        host2.Open();
        Console.WriteLine("host2 open");

        // Create a client 
        ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
        IService client = factory.CreateChannel();

        // Periodically call the service with a new number
        int counter = 1;
        using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
        {
            // Enter to stop
            Console.ReadLine();
        }

        host1.Close();
        Console.WriteLine("host1 closed");
        host2.Close();
        Console.WriteLine("host2 closed");

        // Wait for exit
        Console.ReadLine();
    }

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
    {
        var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
    }

    [ServiceContract]
    interface IService
    {
        [OperationContract(IsOneWay = true)]
        void EchoNumber(int number);
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    class ServiceImpl : IService
    {
        public ServiceImpl(string name, int sleep = 0)
        {
            this.name = name;
            this.sleep = sleep;
        }

        private string name;
        private int sleep;

        public void EchoNumber(int number)
        {
            Thread.Sleep(this.sleep);
            Console.WriteLine("{0}: {1:00}", this.name, number);
        }
    }
}
batwad,

您正在尝试手动创建服务总线.你为什么不尝试使用现有的?

NServiceBus,MassTransit,ServiceStack

其中至少有2个与MSMQ合作.

此外,如果您绝对需要订购,实际上可能是出于另一个原因 – 您希望能够发送消息,并且您不希望在第一条消息之前处理相关消息.您正在寻找Saga Pattern. NServiceBus和MassTransit都允许您轻松管理Sagas,它们都允许您简单地触发初始消息,然后根据条件触发剩余消息.它将允许您快速实现分布式应用程序的丰满.

然后,您甚至可以扩展到数千个客户端,队列服务器和消息处理器,而无需编写任何代码,也不会出现任何问题.

我们试图在这里通过msmq实现我们自己的服务总线,我们放弃了,因为另一个问题不断蔓延.我们选择了NServiceBus,但MassTransit也是一款出色的产品(它是100%的开源,NServiceBus不是). ServiceStack在制作API和使用消息队列方面非常棒 – 我相信你可以用它来制作几分钟内充当队列前端的服务.

哦,我是否提到在NSB和MT的情况下,只需要10行代码就可以完全实现队列,发送者和处理程序?

– – – 添加 – – –

Udi Dahan(NServiceBus的主要贡献者之一)谈到了这一点:
“In-Order Messaging a Myth” by Udi Dahan
“Message Ordering: Is it Cost Effective?” with Udi Dahan

克里斯帕特森(公共交通的主要贡献者之一)
“Using Sagas to ensure proper sequential message order” question

StackOverflow问题/答案:
“Preserve message order when consuming MSMQ messages in a WCF application”

– – – 题 – – –

我必须说,我很困惑为什么你需要保证消息顺序 – 如果你使用HTTP / SOAP协议,你会处于相同的位置吗?我的猜测是否定的,那为什么它在MSMQ中是一个问题?

祝你好运,希望这会有帮助,

相关文章
相关标签/搜索