Skip to content

Commit 7a4fab1

Browse files
committed
🚧 work in propress for rule engine about thingsboard
1 parent 1d42fb0 commit 7a4fab1

File tree

9 files changed

+167
-12
lines changed

9 files changed

+167
-12
lines changed

IOT-Guide-RateLimiting/src/main/java/iot/technology/ratelimiting/config/AppConfig.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

IOT-Guide-RuleEngine-ThingsBoard/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,29 @@
2929
<groupId>iot.technology</groupId>
3030
<artifactId>IOT-Guide-RuleEngine-ThingsBoard</artifactId>
3131

32+
<dependencies>
33+
<dependency>
34+
<groupId>org.springframework.boot</groupId>
35+
<artifactId>spring-boot-starter-web</artifactId>
36+
<scope>provided</scope>
37+
</dependency>
38+
<dependency>
39+
<groupId>iot.technology</groupId>
40+
<artifactId>IOT-Guide-Actor</artifactId>
41+
<version>1.0-SNAPSHOT</version>
42+
</dependency>
43+
</dependencies>
44+
45+
<build>
46+
<plugins>
47+
<plugin>
48+
<groupId>org.springframework.boot</groupId>
49+
<artifactId>spring-boot-maven-plugin</artifactId>
50+
<version>${spring-boot.version}</version>
51+
</plugin>
52+
</plugins>
53+
</build>
54+
55+
3256

3357
</project>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
create table if not exists relation
2+
(
3+
from_id bigint null comment '',
4+
from_type varchar(255) null comment '"源"类型',
5+
to_id bigint null comment '目标',
6+
to_type varchar(255) null comment '目标类型',
7+
relation_type_group varchar(255) null,
8+
relation_type varchar(255) null
9+
) comment '关系表';
10+
11+
create table if not exists rule_chain
12+
(
13+
id bigint auto_increment
14+
primary key,
15+
create_time bigint null comment '创建时间',
16+
name varchar(255) null,
17+
type varchar(255) null,
18+
first_rule_node_id bigint null
19+
) comment '规则链';
20+
21+
create table if not exists rule_node
22+
(
23+
id bigint not null
24+
primary key,
25+
rule_chain_id bigint not null comment '规则链编号',
26+
configuration varchar(10000) null comment '配置项',
27+
type varchar(255) null comment '类型',
28+
name varchar(255) null comment '名称',
29+
create_time bigint null comment '创建时间'
30+
) comment '规则点';
31+
32+
33+
34+

IOT-Guide-RuleEngine-ThingsBoard/src/main/java/iot/technology/thingsboard/RuleEngineThingsBoardServer.java renamed to IOT-Guide-RuleEngine-ThingsBoard/src/main/java/iot/technology/thingsboard/ruleengine/RuleEngineThingsBoardServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package iot.technology.thingsboard;
1+
package iot.technology.thingsboard.ruleengine;
22

33
/**
44
* @author james mu
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package iot.technology.thingsboard.ruleengine.common;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.ForkJoinPool;
5+
6+
/**
7+
* @author mushuwei
8+
*/
9+
public class ActorExecutors {
10+
11+
public static ExecutorService newWorkStealingPool(int parallelism, String namePrefix) {
12+
return new ForkJoinPool(parallelism,
13+
new ActorForkJoinWorkerThreadFactory(namePrefix),
14+
null, true);
15+
}
16+
17+
private static ExecutorService newWorkStealingPool(int parallelism, Class clazz) {
18+
return newWorkStealingPool(parallelism, clazz.getSimpleName());
19+
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package iot.technology.thingsboard.ruleengine.common;
2+
3+
import lombok.NonNull;
4+
import lombok.ToString;
5+
6+
import java.util.concurrent.ForkJoinPool;
7+
import java.util.concurrent.ForkJoinWorkerThread;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
10+
/**
11+
* @author mushuwei
12+
*/
13+
@ToString
14+
public class ActorForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
15+
16+
private final String namePrefix;
17+
private final AtomicLong threadNumber = new AtomicLong(1);
18+
19+
public ActorForkJoinWorkerThreadFactory(@NonNull String namePrefix) {
20+
this.namePrefix = namePrefix;
21+
}
22+
23+
@Override
24+
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
25+
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
26+
thread.setName(namePrefix + "-" + thread.getPoolIndex() + "-" + threadNumber.getAndIncrement());
27+
return thread;
28+
}
29+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package iot.technology.thingsboard.ruleengine.service;
2+
3+
/**
4+
* @author mushuwei
5+
*/
6+
public interface ActorService {
7+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package iot.technology.thingsboard.ruleengine.service;
2+
3+
import iot.technology.actor.ActorSystem;
4+
import iot.technology.actor.core.ActorRef;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.stereotype.Service;
7+
8+
/**
9+
* @author mushuwei
10+
*/
11+
@Service
12+
@Slf4j
13+
public class ActorSystemContext {
14+
15+
private ActorSystem system;
16+
17+
private ActorRef appActor;
18+
19+
private int actorThroughput;
20+
21+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package iot.technology.thingsboard.ruleengine.service;
2+
3+
import iot.technology.actor.common.ActorThreadFactory;
4+
import iot.technology.thingsboard.ruleengine.common.ActorExecutors;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.stereotype.Service;
7+
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
11+
/**
12+
* @author mushuwei
13+
*/
14+
@Service
15+
@Slf4j
16+
public class DefaultActorService implements ActorService {
17+
18+
19+
private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
20+
if (poolSize == 0) {
21+
int cores = Runtime.getRuntime().availableProcessors();
22+
poolSize = Math.max(1, cores / 2);
23+
}
24+
if (poolSize == 1) {
25+
return Executors.newSingleThreadExecutor(ActorThreadFactory.forName(dispatcherName));
26+
} else {
27+
return ActorExecutors.newWorkStealingPool(poolSize, dispatcherName);
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)