10

I am successfully using MassTransit for a silly sample application where I publish an message (an event) from a Publisher console application and I receive it at two different consumers which are also console applications using RabbitMq.

This is the whole sample project git repo: https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

I want to have a project wrapping MassTransit functionality so that my Publisher and Consumers projects know nothing about MassTransit. The dependencies should go in this direction:

  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

Notice how DiDrDe.MessageBus does not know anything about DiDrDe.Model because it's a generic project that should be valid for any message type.

To achieve this I am implementing the adapter pattern so that my custom interfaces IEventDtoBus (to publish events) and IEventDtoHandler<TEventDto> (to consume events) are all my Publisher and Consumers know. The MassTransit wrapper project (called DiDrDe.MessageBus) implements the adapters with an EventDtoBusAdapter composed of an IEventDtoBus and a EventDtoHandlerAdapter<TEventDto> as my only generic IConsumer<TEventDto> composed of an IEventDtoHandler<TEventDto>

The problem I have is with the way MassTransit requires the consumers to be registered because my consumer is a generic one and its type should not be known by the MassTransit wrapper at compile time.

I need to find a way to register the EventDtoHandlerAdapter<TEventDto> as a consumer for each TEventDto type I pass at runtime (as a collection of types for example). Please see my repository for all the details.

MassTransit supports an overload method that accepts a type (good! just what I want) but it also requires a second argument Func<type, object> consumerFactory and I don't know how to implement it.

UPDATE 1: The problem is that I cannot register this generic consumer like:

consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();

because I get a compilation error

Severity Code Description Project File Line Suppression State Error CS0310 'EventDtoHandlerAdapter' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TConsumer' in the generic type or method 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

UPDATE 2: I have tried several things and I have updated the project on my repo. These are my attempts in the MassTransit wrapper project. Notice how I am able to have everything working if I add a dependency to each message (event) I want to handle. But I don't want that.. I don't want this project to know ANYTHING about the messages it can handle. If only I could register consumers knowing only a message type..

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS WORKS
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));

    // DOES NOT WORK
    //var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
    //var eventDtoHandler = context.Resolve(typeEventDtoHandler);
    //consumer.Consumer(eventDtoHandler);

    // DOES NOT WORK
    //consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);

    // DOES NOT WORK
    //var consumerGenericType = typeof(IConsumer<>);
    //var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
    //consumer.Consumer(consumerThingHappenedType,  null);
});

UPDATE 3: Following Igor's advice, I try to do the following:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
                            consumer.Consumer(adapterType, (Type x) => context.Resolve(x));

but I get a runtime error saying that

The requested service 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' has not been registered. To avoid this exception, either register a component to provide the service, check for service registration using IsRegistered(), or use the ResolveOptional() method to resolve an optional dependency.

I have even tried to register EventDtoHandlerAdapter<> separately as a IConsumer in case that was the issue but no luck.

builder
    .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
    .As(typeof(IConsumer<>))
    .SingleInstance();

Also with:

builder
  .RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
  .AsSelf();

and it tells me that

System.ObjectDisposedException: 'This resolve operation has already ended. When registering components using lambdas, the IComponentContext 'c' parameter to the lambda cannot be stored. Instead, either resolve IComponentContext again from 'c', or resolve a Func<> based factory to create subsequent components from

Just to clarify, The only consumer I need to register is my EventDtoHandlerAdapter<TEventDto>. It is generic, so essentially it will exist a registration per TEventDto that I support. The thing is that I don't need the types in advance so I need to operate with types.

UPDATE 4: A new attempt as suggested by Igor. This time with the "proxy". I have updated my repo with the last attempt for all details. I have my consumer and marker interface:

public interface IEventDtoHandler
{
}

public interface IEventDtoHandler<in TEventDto>
    : IEventDtoHandler
        where TEventDto : IEventDto
{
    Task HandleAsync(TEventDto eventDto);
}

and I have my own implementation of a consumer that knows nothing about MassTransit:

public class ThingHappenedHandler
    : IEventDtoHandler<ThingHappened>
{
    public Task HandleAsync(ThingHappened eventDto)
    {
        Console.WriteLine($"Received {eventDto.Name} " +
                            $"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
        return Task.CompletedTask;
    }
}

Now my "wrapper consumer" is what I call the adapter because it knows about MassTransit (it implements MassTransit IConsumer).

public class EventDtoHandlerAdapter<TConsumer, TEventDto>
    : IConsumer<TEventDto>
        where TConsumer : IEventDtoHandler<TEventDto>
        where TEventDto : class, IEventDto
{
    private readonly TConsumer _consumer;

    public EventDtoHandlerAdapter(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public async Task Consume(ConsumeContext<TEventDto> context)
    {
        await _consumer.HandleAsync(context.Message);
    }
}

Now the last step is to register my "wrapper consumer" with MassTransit. But since it's generic I don't know how to do it. This is the issue.

I can have all my consumer types scanned and registered in Autofac as suggested with:

var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
    AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(x => x.GetTypes())
        .Where(x => interfaceType.IsAssignableFrom(x)
                    && !x.IsInterface
                    && !x.IsAbstract)
        .ToList();

So now I have all the consumer types (all the implementations of IEventDtoHandler, including my ThingHappenedHandler). What now? How to register it?

Something like the following does not work:

foreach (var consumerType in consumerTypes)
{
    consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}

But I guess it's normal that it doesn't work because what I want to register is my EventDtoHandlerAdapter, which is the real IConsumer.

So, I think I didn't understand your suggestion. Sorry!

What I need is something like this:

//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));

But without using the ThingHappened model because the model should not be known. Here is where I remain stuck

UPDATE 5: New attempt as suggested by Chris Patterson (his solution was merged into master on my repo), but the problem remains.

To clarify, DiDrDe.MessageBus must be agnostic of any publisher, consumer and model. It should only depend on MassTransit and DiDrDe.Contracts, and Chris' solution has a line like:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});

That has a direct dependency on ThingHappened model. This is not allowed and it's actually no much different from the solution I already had which was:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS works, but it uses ThingHappened explicitly and I don't want that dependency
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});

Sorry if this was not clear enough, but DiDrDe.MessageBus will eventually be a nuGet package shared between many different consumer and publisher projects and it should not have any dependency to any specific message/model.

UPDATE 6: The question has been resolved. Thanks a lot to Igor and Chris for their time and help. I have pushed the solution to master on my repo.

PS: Unfortunately when I have two handlers in the same consumer handling the same Event, this solution has its limitations because it seems only one of the handlers is being executed (twice). I would expect both handlers to be executed or only one, but only once (not twice). But this is another subject already :)

diegosasw
  • 13,734
  • 16
  • 95
  • 159
  • Have you looked into `builder.RegisterGeneric` ? – cl0ud Oct 15 '18 at 11:59
  • yes, I have. Not sure how would RegisterGeneric play with this though.. Please see UPDATE 2 – diegosasw Oct 15 '18 at 15:21
  • @iberodev i'am also looking for similar kind of approach. Can I use your gitlab project? is it complete or any know issues are there? if yes, will you be able to udpate with latest code which you have? – bommina Apr 21 '19 at 09:43
  • 1
    @venkat.bommina feel free to clone my repo and adapt it to your needs – diegosasw Apr 29 '19 at 11:55

2 Answers2

10

Take a look at custom consumer convention here: https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional https://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs

Create your own IMyConsumerInterface, IMyMessageInterface plug it in the code from that test. Register it ConsumerConvention.Register<CustomConsumerConvention>(); before creating the bus. It should work.

Additionally you can create your own wrapper around the consumer context and have that passed together with the message.

LoadFrom (MassTransit.AutofacIntegration) didn't work for me with the custom convention so I had to manually register consumers

foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

Alternatively if you want to use "proxy" approach do something like:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>
{
    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public Task Consume(ConsumeContext<TMessage> context)
    {
        return _consumer.Consume(context.Message);
    }
}

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));

additional code applicable to both approaches
// marker interface
public interface IMyConsumerInterface
{
}

public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 
{
    Task Consume(T message);
}

...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();

**RE: UPDATE 5**
builder.Register(context =>
{
    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        {
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        });
    });
    return busControl;
})
Ali
  • 468
  • 7
  • 18
Igor Dražić
  • 376
  • 2
  • 7
  • I am trying to understand your comment. Please see my Update 2. What do you have in consumerTypes? The type of an IConsumer<>? (in my case the eventDtoHandlerAdapter?) If so, do I have to register eplicitly with something like: `builder .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))As(typeof(IConsumer<>)).InstancePerDependency();` Not sure I understand, sorry – diegosasw Oct 15 '18 at 15:31
  • Please see Update 3. I have tried your second suggestion without any luck. It fails at runtime telling me that my consumer has not been registered in Autofac. – diegosasw Oct 15 '18 at 15:47
  • My answer was about changing your design completely and abstract away masstransit using custom consumer conventions. If you want alternative I'll update my answer. – Igor Dražić Oct 16 '18 at 08:14
  • Thanks for your time, but I still don't get your answer. I tried the "proxy" approach. Mark my generic consumer interface with the mark interface, useful to register all the consumer implementations. But unfortunately that's not the issue. The issue is, once you have all the consumerTypes how do you register them in MassTransit? Having a type is not enough. Please see my UPDATE 4 and thanks again! – diegosasw Oct 16 '18 at 14:13
  • I've just added a bounty reward of 250 points to encourage an answer that uses the code I currently have on the specified sample git repository :) – diegosasw Oct 16 '18 at 17:15
  • 1
    (re: UPDATE 5, can't comment on Chris' answer) You're almost there, instead `consumer.Consumer>(context);` use `foreach (var adapterType in adapterTypes) consumer.Consumer(adapterType, (Type type) => ctx.Resolve(adapterType));`. You don't need model just contracts. Remove model and add Contracts – Igor Dražić Oct 17 '18 at 09:53
  • 1
    you really should not have every handler the same queue, they're unrelated commands or events, and processing them all from a single queue will eventually become an issue as load increases. – Chris Patterson Oct 17 '18 at 13:11
  • so you're saying every queue should essentially store only one type of message? I thought every microservice could use a queue, but handle multiple message types from that queue, although it makes sense that there is no more than a handler (consumer) per message type. Ta. – diegosasw Oct 17 '18 at 15:33
1

I submitted a pull request to your problem that works for me, consumer starts with no issues.

You could extend it to include the automatic registration of all the consumers on their own endpoints as well, if needed.

The trick was to properly register the handler types with the proper generic interface types. A little type mechanics, but it works for me on my end.

Chris Patterson
  • 28,659
  • 3
  • 47
  • 59
  • Thank you Chris. Merged into master. The only issue I see with your solution is that `DiDrDe.MessageBus` has a dependency on `DiDrDe.Model` and that's not allowed for what I want. Eventually DiDrDe.MessageBus will be an independent NuGet package that knows nothing about any model. Just about DiDrDe.Contracts and MassTransit. So the line `consumer.Consumer>(context);` is still problematic. – diegosasw Oct 17 '18 at 08:38
  • The problem remains. Please see UPDATE 5. Actually as per UPDATE 4 I already had an insufficient "solution" where it worked but at the price of having `DiDrDe.MessageBus` depending directly on `DiDrDe.Model`, and I don't want that. – diegosasw Oct 17 '18 at 09:02
  • I've resolved the solution after Chris' pointed me on the right direction thanks to your answer. – diegosasw Oct 17 '18 at 11:19
  • @diegosasw do you have a sample on how you fixed it ? – kuldeep Jun 19 '20 at 10:38
  • 1
    Probably [this right here](https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs) on registering your own convention. – Chris Patterson Jun 19 '20 at 12:45
  • 2
    @kuldeep at my repo https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus/-/tree/master/DiDrDe.MessageBus.Infra.MassTransit.Autofac – diegosasw Jun 19 '20 at 14:09