Apache Camel快速入门(中)
3-3、Processor 处理器
Camel中另一个重要的元素是Processor处理器,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。Camel核心包和各个Plugin组件都提供了很多Processor的实现,开发人员也可以通过实现org.apache.camel.Processor接口自定义处理器(后者是通常做法)。
既然是做编码,那么我们自然可以在自定义的Processor处理器中做很多事情。这些事情可能包括处理业务逻辑、建立数据库连接去做业务数据存储、建立和某个第三方业务系统的RPC连接,但是我们一般不会那样做——那是Endpoint的工作。Processor处理器中最主要的工作是进行业务数据格式的转换和中间数据的临时存储。这样做是因为Processor处理器是Camel编排的路由中,主要进行Exchange输入输出消息交换的地方。
不过开发人员当然可以在Processor处理器中连接数据库。例如开发人员需要根据上一个Endpoint中携带的“订单编号前缀”信息,在Processor处理器中连接到一个独立的数据库中(或者缓存服务中)查找其对应的路由信息,以便动态决定下一个路由路径。由于Camel支持和JAVA语言的Spring框架无缝集成,所以要在Processor处理器中操作数据库只需要进行非常简单的配置。
以下代码片段是自定义的Processor处理器实现,其中的process(Exchange exchange)方法是必须进行实现的:
// 一个自定义处理器的实现
// 就是我们上文看到过的处理器实现了
public class OtherProcessor implements Processor {
......
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
//===============
// 您可以在这里进行数据格式转换
// 并且将结果存储到out message中
//===============
// 存入到exchange的out区域
if(exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || other out");
}
}
......
}
注意,处理器Processor是和控制端点平级的概念。要看一个URI对应的实现是否是一个控制端点,最根本的就是看这个实现类是否实现了org.apache.camel.Endpoint接口;而要看一个路由中的元素是否是Processor处理器,最根本的就是看这个类是否实现了org.apache.camel.Processor接口。
3-4、Routing路由条件
在控制端点和处理器之间、处理器和处理器之间,Camel允许开发人员进行路由条件设置。例如开发人员可以拥有当Exchange In Message的内容为A的情况下将消息送入下一个处理器A,当Exchange In Message的内容为B时将消息送入下一个处理器B的处理能力。又例如,无论编排的路由中上一个元素的处理消息如何,都将携带消息的Exchange对象复制 多份,分别送入下一处理器X、Y、Z。开发人员甚至还可以通过路由规则完成Exchange到多个Endpoint的负载传输。
Camel中支持的路由规则非常丰富,包括:Message Filter、Based Router、Dynamic Router、Splitter、Aggregator、Resequencer等等。在Camel的官方文档中使用了非常形象化的图形来表示这些路由功能(http://camel.apache.org/enterprise-integration-patterns.html):
实际上EIP规则中所描述的大部分业务集成模式,在以上页面都能找到对应的图形化表达。但由于篇幅和本专题的中心思想限制,恕笔者不能对Camel中的路由规则逐一讲解。这里我们选取两个比较有代表性的路由规则进行讲解:Content Based Router和Recipient List。希望对各位读者理解Camel中的路由规则有所帮助:
3-4-1、Content Based Router 基于内容的路由
把Content Based Router译为基于内容的路由,笔者觉得更为贴切(并不能译作基本路由,实际上你无法定义什么是基本路由)。它并不是一种单一的路由方式,而是多种基于条件和判断表达式的路由方式。其中可能包括choice语句/方法、when语句/方法、otherwise语句/方法。请看以下示例:
package io.renren.config;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.language.JsonPathExpression;
/**
* 使用条件选择进行路由编排
*
* @author yinwenjie
*/
public class ChoiceCamel extends RouteBuilder {
public static void main(String[] args) throws Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes(new ChoiceCamel());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (ChoiceCamel.class) {
ChoiceCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// 这是一个JsonPath表达式,用于从http携带的json信息中,提取orgId属性的值
JsonPathExpression jsonPathExpression = new JsonPathExpression("$.data.orgId");
jsonPathExpression.setResultType(String.class);
// 通用使用http协议接受消息
from("jetty:http://0.0.0.0:8282/choiceCamel")
// 首先送入HttpProcessor,
// 负责将exchange in Message Body之中的stream转成字符串
// 当然,不转的话,下面主要的choice操作也可以运行
// HttpProcessor中的实现和上文代码片段中的一致,这里就不再重复贴出
.process((Processor) new HttpProcessor())
// 将orgId属性的值存储 exchange in Message的header中,以便后续进行判断
.setHeader("orgId", jsonPathExpression)
.choice()
// 当orgId == yuanbao,执行OtherProcessor
// 当orgId == yinwenjie,执行OtherProcessor2
// 其它情况执行OtherProcessor3
.when(header("orgId").isEqualTo("yuanbao"))
.process(new OtherProcessor())
.when(header("orgId").isEqualTo("yinwenjie"))
.process(new OtherProcessor2())
.otherwise()
.process(new OtherProcessor3())
// 结束
.endChoice();
}
/**
* 这个处理器用来完成输入的json格式的转换
* 和上一篇文章出现的HttpProcessor 内容基本一致。就不再贴出了
*
* @author yinwenjie
*/
public class HttpProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
}
}
/**
* 另一个处理器OtherProcessor
*
* @author yinwenjie
*/
public class OtherProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// 存入到exchange的out区域
if (exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor处理");
}
}
}
/**
* 很简单的处理器OtherProcessor2
* 和OtherProcessor基本相同,就不再重复贴出
*
* @author yinwenjie
*/
public class OtherProcessor2 implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// 存入到exchange的out区域
if (exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor2处理");
}
};
}
/**
* 很简单的处理器OtherProcessor3
* 和OtherProcessor基本相同,就不再重复贴出
*
* @author yinwenjie
*/
public class OtherProcessor3 implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// 存入到exchange的out区域
if (exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor3处理");
}
};
}
}
以上代码片段中,开发人员首先使用JsonPath表达式,从Http中携带的json信息中寻找到orgId这个属性的值,然后将这个值存储在Exchange的header区域(这样做只是为了后续方便判断,您也可以将值存储在Exchange的properties区域,还可以直接使用JsonPath表达式进行判断) 。接下来,通过判断存储在header区域的值,让消息路由进入不同的Processor处理器。由于我们设置的from-jetty-endpoint中默认的Exchange Pattern值为InOut,所以在各个Processor处理器中完成处理后 Out Message的Body内容会以Http响应结果的形式返回到from-jetty-endPoint中。最后我们将在测试页面上看到Processor处理器中的消息值。
Camel中支持绝大多数被开发人员承认和使用的表达式:正则式、XPath、JsonPath等。如果各位读者对JsonPath的语法还不熟悉的话,可以参考Google提供的说明文档(https://code.google.com/p/json-path/)。为了测试以上代码片段的工作效果,我们使用Postman工具向指定的地址发送一段json信息,并观察整个路由的执行效果。如下图所示:
当orgId的值为yuanbao时的执行效果
当orgId的值为yinwenjie时的执行效果
关于路由判断,Camel中提供了丰富的条件判断手段。除了我们在本小节中使用的isEqualTo方式还包括:isGreaterThan、isGreaterThanOrEqualTo、isLessThan、isLessThanOrEqualTo、isNotEqualTo、in(多个值)、contains、regex等等,它们的共同点是这些方法都返回某个实现了org.apache.camel.Predicate接口的类。
3-5-2、Recipient List 接收者列表
在本小节上部分的介绍中,我们说明了怎么使用条件判断向若干可能的路由路径中的某一条路径传送消息。那么如何做到根据判断条件,向若干可能的路径中的其中多条路径传送同一条消息呢?又或者向若干条可能的路径全部传输同一条消息呢?
在Camel中可能被选择的消息路由路径称为接收者,Camel提供了多种方式向路由中可能成为下一处理元素的多个接收者发送消息:静态接收者列表(Static Recipient List)、动态接收者列表(Dynamic Recipient List)和 循环动态路由(Dynamic Router)。下面我们对这几种接收者列表形式进行逐一讲解。
3-5-2-1、使用multicast处理Static Recipient List
使用multicast方式时,Camel将会把上一处理元素输出的Exchange复制多份发送给这个列表中的所有接收者,并且按顺序逐一执行(可设置为并行处理)这些接收者。这些接收者可能是通过Direct连接的另一个路由,也可能是Processor或者某个单一的Endpoint。需要注意的是,Excahnge是在Endpoint控制端点和Processor处理器间或者两个Processor处理器间唯一能够有效携带Message的元素,所以将一条消息复制多份并且让其执行不相互受到影响,那么必然就会对Exchange对象进行复制(是复制,是复制,虽然主要属性内容相同,但是这些Exchange使用的内存区域都是不一样的,ExchangeId也不一样)。以下是multicast使用的简单示例代码:
package com.yinwenjie.test.cameltest.helloworld;
......
/**
* 测试组播路由
* @author yinwenjie
*/
public class MulticastCamel extends RouteBuilder {
public static void main(String[] args) throws Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes(new MulticastCamel());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (MulticastCamel.class) {
MulticastCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// 这个线程池用来进行multicast中各个路由线路的并发执行
ExecutorService executorService = Executors.newFixedThreadPool(10);
MulticastDefinition multicastDefinition = from("jetty:http://0.0.0.0:8282/multicastCamel").multicast();
// multicast 中的消息路由可以顺序执行也可以并发执行
// 这里我们演示并发执行
multicastDefinition.setParallelProcessing(true);
// 为并发执行设置一个独立的线程池
multicastDefinition.setExecutorService(executorService);
// 注意,multicast中各路由路径的Excahnge都是基于上一路由元素的excahnge复制而来
// 无论前者Excahnge中的Pattern如何设置,其处理结果都不会反映在最初的Excahnge对象中
multicastDefinition.to(
"log:helloworld1?showExchangeId=true"
,"log:helloworld2?showExchangeId=true")
// 一定要使用end,否则OtherProcessor会被做为multicast中的一个分支路由
.end()
// 所以您在OtherProcessor中看到的Excahnge中的Body、Header等属性内容
// 不会有“复制的Exchange”设置的任何值的痕迹
.process(new OtherProcessor());
}
/**
* 另一个处理器
* @author yinwenjie
*/
public class OtherProcessor implements Processor {
/* (non-Javadoc)
* @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
*/
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
LOGGER.info("OtherProcessor中的exchange" + exchange);
String body = message.getBody().toString();
// 存入到exchange的out区域
if(exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor处理");
}
}
}
}
以上代码片段中,我们使用multicast将原始的Exchange复制了多份,分别传送给multicast中的两个接收者,并且为了保证两个接收者的处理过程是并行的,我们还专门为multicast设置了一个线程池(不设置的话Camel将自行设置)。在以上的代码片段中,在multicast路由定义之后我们还设置了一个OtherProcessor处理器,它主要作用就是查看OtherProcessor中的Exchange对象的状态。下面展示了以上代码片段的执行效果:
[2016-06-24 16:07:43] INFO pool-1-thread-7 Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-20, ExchangePattern: InOut, BodyType: String, Body: {“data”:{“orgId”:”yinwenjie”},”token”:”d9c33c8f-ae59-4edf-b37f-290ff208de2e”,”desc”:””}] (MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO pool-1-thread-8 Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-19, ExchangePattern: InOut, BodyType: String, Body: {“data”:{“orgId”:”yinwenjie”},”token”:”d9c33c8f-ae59-4edf-b37f-290ff208de2e”,”desc”:””}] (MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO qtp1060925979-18 OtherProcessor中的exchange [id:ID-yinwenjie-240-54110-1466755310568-0-16]Exchange[Message: {“data”:{“orgId”:”yinwenjie”},”token”:”d9c33c8f-ae59-4edf-b37f-290ff208de2e”,”desc”:””}] (MulticastCamel.java:74)
通过执行结果可以看到,在multicast中的两个接收者(两个路由分支的设定)分别在我们设置的线程池中运行,线程ID分别是【pool-1-thread-7】和【pool-1-thread-8】。在multicast中的所有路由分支都运行完成后,OtherProcessor处理器的实例在【qtp1060925979-18】线程中继续运行(jetty:http-endpint对于本次请求的处理原本就在这个线程上运行)。
请各位读者特别注意以上三句日志所输出的ExchangeId,它们是完全不同的三个Exchange实例!其中在multicast的两个路由分支中承载Message的Excahnge对象,它们的Exchange-ID号分别为【ID-yinwenjie-240-54110-1466755310568-0-20】和【ID-yinwenjie-240-54110-1466755310568-0-19】,来源则是multicast对原始Exchange对象的复制,原始Exchagne对象的Exchange-ID为【ID-yinwenjie-240-54110-1466755310568-0-16】。
3-4-2-2、处理Dynamic Recipient List
在编排路由,很多情况下开发人员不能确定有哪些接收者会成为下一个处理元素:因为它们需要由Exchange中所携带的消息内容来动态决定下一个处理元素。这种情况下,开发人员就需要用到recipient方法对下一路由目标进行动态判断。以下代码示例中,我们将三个已经编排好的路由注册到Camel服务中,并通过打印在控制台上的结果观察其执行:
**第一个路由 DirectRouteA**
public class DirectRouteA extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8282/dynamicCamel")
.setExchangePattern(ExchangePattern.InOnly)
.recipientList().jsonpath("$.data.routeName").delimiter(",")
.end()
.process(new OtherProcessor());
}
}
第二个和第三个路由
/**
* @author yinwenjie
*/
public class DirectRouteB extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
// 第二个路由和第三个路由的代码都相似
// 唯一不同的是类型
from("direct:directRouteB")
.to("log:DirectRouteB?showExchangeId=true");
}
}
注册到Camel服务中,并开始执行
......
public static void main(String[] args) throws Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
camelContext.addRoutes((new DynamicCamel()).new DirectRouteA());
camelContext.addRoutes((new DynamicCamel()).new DirectRouteB());
camelContext.addRoutes((new DynamicCamel()).new DirectRouteC());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (DynamicCamel.class) {
DynamicCamel.class.wait();
}
}
......
DirectRouteB路由和DirectRouteC路由中的代码非常简单,就是从通过direct连接到本路由的上一个路由实例中获取并打印Exchange对象的信息。所以各位读者可以看到以上代码片段只列举了DirectRouteB的代码信息。DirectRouteA路由中“ExchangePattern.InOnly”的作用在上文中已经讲过,这里就不再进行赘述了。需要重点说明的是recipientList方法,这个方法可以像multicast方法那样进行并发执行或者运行线程池的设置,但是在DirectRouteA的代码中我们并没有那样做,这是为了让读者看recipientList或者multicast方法的顺序执行执行效果。以下是我们启动Camel服务后,从Postman(或者其他测试工具)传入的JSON格式的信息:
{“data”:{“routeName”:”direct:directRouteB,direct:directRouteC”},”token”:”d9c33c8f-ae59-4edf-b37f-290ff208de2e”,”desc”:””}
recipientList方法将以 .data.routeName 中指定的路由信息动态决定一下个或者多个消息接收者,以上JSON片段中我们指定了两个“direct:directRouteB,direct:directRouteC”。那么recipientList会使用delimiter方法中设置的“,”作为分隔符来分别确定这两个接收者。观察到的执行效果
[2016-06-26 10:31:53] INFO qtp1896561093-16 Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-3, ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16 Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-4, ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16 OtherProcessor中的exchange [id:ID-yinwenjie-240-55214-1466908306101-0-1]Exchange[Message: [Body is instance of org.apache.camel.StreamCache]] (DynamicCamel.java:100)
静态路由和动态路由在执行效果上有很多相似之处。例如在两种路径选择方式中,路由分支上的接收者中使用的Exchange对象的来源都是对上一执行元素所输出的Exchange对象的复制,这些Exchange对象除了其中携带的业务内容相同外,ExchangeID是不一样,也就是说每个路由分支的Exchange对象都不相同。所以各路由分支的消息都不受彼此影响。另外动态路由和静态路由都支持对路由分支的顺序执行和并发执行,都可以为并发执行设置独立的线程池。
从以上执行效果中我们可以看到,由于我们没有设置动态路由是并发执行,所以各个需要执行的路由分支都是由名为【qtp1896561093-16】的Camel服务线程依次执行,并且每个路由分支的Exchange对象都不受彼此影响。另外,请注意以上执行结果的最后一条日志信息,它是在路由分支以外对OtherProcessor处理器的执行。由此可见无论路由分支如何执行,都不会影响路由分支以外的元素执行时所使用的Exchange对象。
3-4-2-3循环动态路由 Dynamic Router
动态循环路由的特点是开发人员可以通过条件表达式等方式,动态决定下一个路由位置。在下一路由位置处理完成后Exchange将被重新返回到路由判断点,并由动态循环路由再次做出新路径的判断。如此循环执行直到动态循环路由不能再找到任何一条新的路由路径为止。下图来源于官网(http://camel.apache.org/dynamic-router.html),展示了动态循环路由的工作效果:
这里可以看出动态循环路由(dynamicRouter)和之前介绍的动态路由(recipientList)在工作方式上的差异。dynamicRouter一次选择只能确定一条路由路径,而recipientList只进行一次判断并确定多条路由分支路径;dynamicRouter确定的下一路由在执行完成后,Exchange对象还会被返回到dynamicRouter中以便开始第二次循环判断,而recipientList会为各个分支路由复制一个独立的Exchange对象,并且各个分支路由执行完成后Exchange对象也不会返回到recipientList;下面我们还是通过源代码片段,向各位读者展示dynamicRouter的使用方式。在代码中,我们编排了三个路由DirectRouteA主要负责通过Http协议接收处理请求,并执行dynamicRouter。DirectRouteB和DirectRouteC两个路由是可能被dynamicRouter选择的分支路径:
DirectRouteA
/**
* 第一个路由,主要用于定义整个路由的起点
* 通过Http协议接收处理请求
* @author yinwenjie
*/
public class DirectRouteA extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8282/dynamicRouterCamel")
// 使用dynamicRouter,进行“动态路由”循环,
// 直到指定的下一个元素为null为止
.dynamicRouter().method(this, "doDirect")
.process(new OtherProcessor());
}
/**
* 该方法用于根据“动态循环”的次数,确定下一个执行的Endpoint
* @param properties 通过注解能够获得的Exchange中properties属性,可以进行操作,并反映在整个路由过程中
* @return
*/
public String doDirect(@Properties Map<String, Object> properties) {
// 在Exchange的properties属性中,取出Dynamic Router的循环次数
AtomicInteger time = (AtomicInteger)properties.get("time");
if(time == null) {
time = new AtomicInteger(0);
properties.put("time", time);
} else {
time = (AtomicInteger)time;
}
LOGGER.info("这是Dynamic Router循环第:【" + time.incrementAndGet() + "】次执行!执行线程:" + Thread.currentThread().getName());
// 第一次选择DirectRouteB
if(time.get() == 1) {
return "direct:directRouteB";
}
// 第二次选择DirectRouteC
else if(time.get() == 2) {
return "direct:directRouteC";
}
// 第三次选择一个Log4j-Endpoint执行
else if(time.get() == 3) {
return "log:DirectRouteA?showExchangeId=true&showProperties=ture&showBody=false";
}
// 其它情况返回null,终止 dynamicRouter的执行
return null;
}
}
在DirectRouteA中我们使用“通过一个method方法返回信息”的方式确定dynamicRouter“动态循环路由”的下一个Endpoint。当然在实际使用中,开发人员还可以有很多方式向dynamicRouter“动态循环路由”返回指定的下一Endpoint。例如使用JsonPath指定JSON格式数据中的某个属性值,或者使用XPath指定XML数据中的某个属性值,又或者使用header方法指定Exchange中Header部分的某个属性。但是无论如何请开发人员确定一件事情:向dynamicRouter指定下一个Endpoint的方式中是会返回null进行循环终止的,否则整个dynamicRouter会无限的执行下去。
以上doDirect方法中,我们将一个计数器存储在了Exchange对象的properties区域,以便在同一个Exchange对象执行doDirect方法时进行计数操作。当同一个Exchange对象第一次执行动态循环路由判断时,选择directRouteB最为一下路由路径;当Exchange对象第二次执行动态循环路由判断时,选择DirectRouteC作为下一路由路径;当Exchange对象第三次执行时,选择一个Log4j-Endpoint作为下一个路由路径;当Exchange对象第四次执行时,作为路由路径判断的方法doDirect返回null,以便终止dynamicRouter的执行。
不能在DirectRouteA类中定义一个全局变量作为循环路由的计数器,因为由Jetty-HttpConsumer生成的线程池中,线程数量和线程对象是固定的,并且Camel也不是为每一个Exchange对象的运行创建新的DirectRouteA对象实例。所以如果在DirectRouteA类中定义全局变量作为循环路由的计数器,各位读者自己想想会发生什么样的结果吧。别骂娘……
DirectRouteB 和 DirectRouteC
/**
* 这是另一条路由分支
* @author yinwenjie
*/
public class DirectRouteC extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:directRouteC")
.to("log:DirectRouteC?showExchangeId=true&showProperties=ture&showBody=false");
}
}
由于DirectRouteB和DirectRouteC两个路由定义的代码非常类似,所以这里只贴出其中一个。启动Camel应用程序,并将路由加入CamelContext
......
public static void main(String[] args) throws Exception {
// 这是camel上下文对象,整个路由的驱动全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 启动route
camelContext.start();
// 将我们编排的一个完整消息路由过程,加入到上下文中
DynamicRouterCamel dynamicRouterCamel = new DynamicRouterCamel();
camelContext.addRoutes(dynamicRouterCamel.new DirectRouteA());
camelContext.addRoutes(dynamicRouterCamel.new DirectRouteB());
camelContext.addRoutes(dynamicRouterCamel.new DirectRouteC());
// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (DynamicRouterCamel.class) {
DynamicRouterCamel.class.wait();
}
}
......
运行效果
[2016-06-27 20:44:52] INFO qtp1392999621-16 这是Dynamic Router循环第:【1】次执行!执行线程:qtp1392999621-16 (DynamicRouterCamel.java:105)
[2016-06-27 20:44:56] INFO qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)
[2016-06-27 20:44:56] INFO qtp1392999621-16 这是Dynamic Router循环第:【2】次执行!执行线程:qtp1392999621-16 (DynamicRouterCamel.java:105)
[2016-06-27 20:44:56] INFO qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)
[2016-06-27 20:44:56] INFO qtp1392999621-16 这是Dynamic Router循环第:【3】次执行!执行线程:qtp1392999621-16 (DynamicRouterCamel.java:105)
[2016-06-27 20:44:56] INFO qtp1392999621-16 Exchange[Id: ID-yinwenjie-240-57818-1467030193866-0-3, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache] (MarkerIgnoringBase.java:96)
[2016-06-27 20:44:56] INFO qtp1392999621-16 这是Dynamic Router循环第:【4】次执行!执行线程:qtp1392999621-16 (DynamicRouterCamel.java:105)
[2016-06-27 20:44:56] INFO qtp1392999621-16 最后exchangeID = ID-yinwenjie-240-57818-1467030193866-0-3 | org.apache.camel.converter.stream.InputStreamCache@2abaa89c || 被OtherProcessor处理 | time = 4 (DynamicRouterCamel.java:150)
从以上执行效果看,无论dynamicRouter执行的是第几次循环判断,Exchange都是同一个(ID号为【ID-yinwenjie-240-57818-1467030193866-0-3】)。
Apache Camel快速入门()https://mp.csdn.net/editor/html/116296801
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/150485.html