kafka2_企业级案例

这篇具有很好参考价值的文章主要介绍了kafka2_企业级案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

课程回顾:

讲述入门案例;

1,从端口接收数据,通过channel ,sink 最终这个数据到日志, – 控制台输出 ,到logj.properties,

nc -l localhost port , source TCP

2监控hive的日志文件,将数据输出到hdfs上存储

2.1监控单个文件的追加,读取 sink输出到日志

tail - f 文件路径

exec source .其余的不变. echo > 覆盖 echo’>> 追加, 使用的追加爱形式

2.2监控单个文件数据的增加,读sink输出到hdfs

spooling source

2,企业实例

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,

Flume1监控端口数据,将监控的内容通过轮询或者随机的方式给到Flume2和Flume3

Flume2将内容打印到控制台

Flume3将内容打印到控制台

2.1采用FailoverSinkProcessor,实现故障转移的功能。配置文件编写

#Flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# round_robin 轮询
a1.sinkgroups.g1.processor.selector = round_robin
# random 随机
#a1.sinkgroups.g1.processor.selector = random

#Bind
a1.sources.r1.channels = c1 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
#Flume2.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

# Describe the sink
# sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#Flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
# Describe the sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
2.2启动命令

先执行下游服务端

flume-ng agent -c $FLUME_HOME/conf -f$FLUME_HOME/jobs/group2/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group2/flume2.conf -n a2 -Dflume.root.logger=INFO,console
再执行力上游的客户端
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group2/flume1.conf -n a1 -Dflume.root.logger=INFO,console

2.3测试

nc localhost 6666

3企业案例3:故障转移案例

Flume1监控端口数据,将监控到的内容发送给Active的Sink

Flume2将内容打印到控制台

Flume3将内容打印到控制台

3.1编写代码

#Flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
#优先级
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#Flume2.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
# Describe the sink
# sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#Flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
# Describe the sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3.2执行命令:

先执行下游服务端,在执行客户端

flume-ng agent -c $FLUME_HOME/conf -f$FLUME_HOME/jobs/group3/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group3/flume2.conf -n a2 -Dflume.root.logger=INFO,console
再执行力上游的客户端
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group3/flume1.conf -n a1 -Dflume.root.logger=INFO,console

3.3测试

nc localhost 6666

4,企业案例:聚合案例

Flume1(102) 监控文件内容

Flume2(103) 监控端口数据,Flume1和 Flume2将监控到数据发往Flume3

Flume3(104) 将内容打印到控制台

4.1编写代码

进入/opt/module/flume-1.9.0/jobs创建group4文件夹

mkdir group4
cd group4
mkdir position
vim flume1.conf
vim flume2.conf
vim flume3.conf
flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/group4/.*\.txt
#产生监控数据存放的文件地址
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/group4/position/position.json
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node4
a1.sinks.k1.port = 8888
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node4
a2.sinks.k1.port = 8888
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = node4
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

4.2 分发fulme文件

4.2.1发送程序给 node3 node4

cd /opt/module/进入目录下,分发文件
[itwise@node2 module]$ my_rsync.sh flume-1.9.0/  

进入并目录下,发送配置环境文件
[itwise@node2 bin]$ sudo ./my_rsync.sh /etc/profile.d/my_env.sh
确定yes 输入密码 :123456

进入 node3/4:进行测试,并且重启一下环境变量配置
/opt/module/flume-1.9.0/bin/flume-ng
itwise@node4 ~]$ source /etc/profile

4.3 启动命令

启动命令:
执行命令:
先执行:下游 服务端
在node4上启动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume3.conf -n a3 -Dflume.root.logger=INFO,console

在node3启动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume2.conf -n a2 -Dflume.root.logger=INFO,console

在执行,上游 客户端

在node2起动
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/group4/flume1.conf -n a1 -Dflume.root.logger=INFO,console

4,4测试

4.41在node2上 进入监控的文件夹
cd /opt/module/flume-1.9.0/jobs/group4
创建文本
[itwise@node2 group4]$ touch test4.txt
写入数据
[itwise@node2 group4]$ echo '我爱fulme' >> test4.txt

查看node4上监控的变化
2024-03-10 17:11:03,812 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E6 88 91 E7 88 B1 66 75 6C 6D 65                ......fulme }

4.42在node3上看监控的端口
进入端口:输入内容
[itwise@node3 ~]$ nc localhost 6666
wo^H^H
OK
123456
OK
我爱flume
OK

查看node4上的变化:
2024-03-10 17:16:06,261 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 32 33 34 35 36                            123456 }
2024-03-10 17:16:28,278 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E6 88 91 E7 88 B1 66 6C 75 6D 65
.....flume }

5.1自定义组件的编写

5.1自定义的拦截器:

业务:

要求:简单: flume1监听客户端端口号, 输入数据:

java:就到 发送给 Flume2 上, hadoop: 就发送到 Flume3上

5.1.1需要自定义一个拦截器:

1,首先创建一个maven工程:编写自己拦截器:

<dependencies>
	<dependency>
		<groupId>org.apache.flume</groupId>
		<artifactId>flume-ng-core</artifactId>
		<version>1.9.0</version>
	</dependency>
</dependencies>

2,编写自定义的拦截器:com.itwise.flume.interceptor.MyInterceptor

package com.itwise.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//实现逻辑 判断 //javaspring hadoop
String body = new String(event.getBody(), StandardCharsets.UTF_8);
if(body.startsWith("java")){
event.getHeaders().put("type", "java");
}else if(body.startsWith("hadoop")){
event.getHeaders().put("type", "hadoop");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
}

3,将接下来将:如上编写好的代码打包 放入 flume 的lib中:

5.2配置多路复用的代码:

在node2中创建目录:

flume1.conf
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555
#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.java = c1
a1.sources.r1.selector.mapping.hadoop = c2
a1.sources.r1.selector.default = c3
# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=com.itwise.flume.interceptor.MyInterceptor$Builder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888
#Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume4.conf
#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888
#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#Sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

5.3 执行程序

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs//DIY1/flume4.conf -n a4 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY1/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs//DIY1/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY1/flume1.conf -n a1 -Dflume.root.logger=INFO,console

测试

nc localhost 5555
  • 6自定义:source

需求:flume1 使用自定义的source,收集随机生成一个数据,输出到日志。打印到控制台:

自定义的MySource,代码如下

package com.itwise.flume.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * 自定义Source 需要继承AbstractSource,实现 Configurable ,PollableSource接口.
 */
public class MySource extends AbstractSource implements Configurable,
        PollableSource {
    private String prefix;
    /**
     * Source的核心处理方法,
     *
     * 该方法在flume的处理流程中是循环调用的。
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
//休眠一秒中
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Status status = null ;
        try {
// Receive new data
// 采集数据,封装成event对象
            Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
// 将event对象交给ChannelProcessor进行处理
            getChannelProcessor().processEvent(e);
// 正常处理,返回Status.READY
            status = Status.READY;
        } catch (Throwable t) {
// Log exception, handle individual exceptions as needed
// 处理失败,返回 Status.BACKOFF
            status = Status.BACKOFF;
        }
        return status;
    }
    /**
     * 随机生成一个字符串作为采集的数据
     * @return
     */
    private Event getSomeData() {
        String data = UUID.randomUUID().toString();
        String resultData = prefix + data ;
        SimpleEvent event = new SimpleEvent();
        event.setBody(resultData.getBytes(StandardCharsets.UTF_8));
        event.getHeaders().put("author","zdy");
        return event ;
    }
    /**
     * 规避时间的增长步长
     * @return
     */
    @Override
    public long getBackOffSleepIncrement() {
        return 1;
    }
    /**
     * 最大的规避时间
     * @return
     */
    @Override
    public long getMaxBackOffSleepInterval() {
        return 10;
    }
    /**
     * 用于读取flume的配置信息 xxx.conf
     * @param context
     */
    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix","log-");
    }
}
flume4.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.itwise.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = logger
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动执行

DIY2_MySource]$ 
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY2_MySource/flume-mysource-logger.conf -n a1 -Dflume.root.logger=INFO,consolee

4.4 自定义:sink

需求:使用自定义的source采集数据,使用自定义的sink打印输出到控制台:

package com.itwise.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
 * 自定义Sink ,需要继承Flume提供的AbstractSink,实现Configurable接口
 */
public class MySink extends AbstractSink implements Configurable {
    Logger logger = LoggerFactory.getLogger(MySink.class);
    /**
     * 核心处理方法
     *
     * 该方法在flume的处理流程中是循环调用的.
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
// Start transaction
//获取Channel
        Channel ch = getChannel();
//获取事务对象
        Transaction txn = ch.getTransaction();
//开启事务
        txn.begin();
        try {
// 从channel中获取event
            Event event = ch.take();
// 处理event
            storeSomeData(event);
// 处理成功,提交事务
            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
// 处理失败,回滚事务
            txn.rollback();
            status = Status.BACKOFF;
        } finally{
//不论事务成功与否。都要关闭
            txn.close();
        }
        return status;
    }
    private void storeSomeData(Event event) {
        String printData = event.getHeaders() + " ::: "+ new
                String(event.getBody(), StandardCharsets.UTF_8);
        logger.info(printData);
    }
    @Override
    public void configure(Context context) {
    }
}

配置文件:

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.itwise.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = com.itwise.flume.sink.MySink
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行命令

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/DIY3_MySink/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console
flume-ng agent \
-c $FLUME_HOME/conf \
-n a1 \
-f $FLUME_HOME/jobs/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=node2:8649

6,简述flume事务控制原理?并画图说明

Put事务流程

•doCommit:检查channel内存队列是否足够合并。

•doRollback:channel内存队列空间不足,回滚数据

Take事务

•doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS

•doCommit:如果数据全部发送成功,则清除临时缓冲区takeList

•doRollback:数据发送过程中如果出现异常,rollback将临时缓冲

区takeList中的数据归还给channel内存队列。

7,Flume Agent 内部原理

ChannelSelector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。

SinkProcessor 
SinkProcessor共有三种类型分别是DefaultSinkProcessor LoadBalancingSinkProcessor 和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。


8,Flume的拓扑结构

8.1点对点

这种是最简单的方式,两个flume,一个的sink是另一个的source,这种结构有点像链式结构,后面还可以接着加节点flume

数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

8.2多副本复制和多路复用

这种结构特点: 一个source,多个channel,而多个channel是同一内容,只不过后面的sink不同,

8.3负载均衡和故障转移

这种结构在大数据领域经常使用,使用多个sink进行负载均衡Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

8.4聚合

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析
Processor 和FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。文章来源地址https://www.toymoban.com/news/detail-853823.html


## 8,Flume的拓扑结构

### 8.1点对点

[外链图片转存中...(img-EIs3Jb4G-1712838835440)]

这种是最简单的方式,两个flume,一个的sink是另一个的source,这种结构有点像链式结构,后面还可以接着加节点flume 

数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

### 8.2多副本**复制和多路复用**

[外链图片转存中...(img-tu8AE7aK-1712838835440)]

这种结构特点: 一个source,多个channel,而多个channel是同一内容,只不过后面的sink不同,

### 8.3**负载均衡和故障转移**

[外链图片转存中...(img-bZj6d51i-1712838835440)]

这种结构在大数据领域经常使用,使用多个sink进行负载均衡Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

### 8.4聚合

[外链图片转存中...(img-OPs5HVCj-1712838835440)]

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

到了这里,关于kafka2_企业级案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 企业级项目开发中保证接口安全的11个小技巧,详细案例指导

    企业级项目开发中保证接口安全的11个小技巧,详细案例指导。 如何保证接口的安全性? 1 参数校验 保证接口安全的第一步,也是最重要的一步,需要对接口的请求参数做校验。 如果我们把接口请求参数的校验做好了,真的可以拦截大部分的无效请求。 我们可以按如下步骤

    2024年01月23日
    浏览(25)
  • 【ELK企业级日志分析系统】部署Filebeat+Kafka+Logstash+Elasticsearch+Kibana集群详解(EFLFK)

    参见安装与部署ELK详解 参见安装与部署EFLK详解 参见安装与部署Zookeeper集群详解 1.1.1 为什么需要消息队列(MQ) MQ(Message Queue)主要原因是由于 在高并发环境下,同步请求来不及处理,请求往往会发生阻塞 。比如大量的并发请求,访问数据库,导致行锁表锁,最后请求线程会

    2024年02月16日
    浏览(23)
  • 企业级web开发框架-期中测试(答案由chatgpt被动提供)

    选择题 B、@RequestMapping A、SpringBoot默认不支持日志配置是错误的描述。SpringBoot默认使用Logback作为日志框架,并且可以通过配置文件进行日志的配置。 B、warn 我之前的回答有误,正确答案是D、info。因为debug级别包含了info级别,所以配置了debug级别后,info级别的日志信息也会被

    2024年02月07日
    浏览(26)
  • 干货-卷起来,企业级web自动化测试实战落地(三)

    Selenium 自动化测试方案 基础方案: 使用unittest,主要是用 TestCase(测试用例); 使用模块化(基本的模块化,抽取公共模块); 使用数据驱动的方式(主要包括 数据库的形式 和文件读取 ); 定制执行需要测试的用例,使用unittest,主要用的是TestSuite(测试套件)、和 TextTe

    2024年02月13日
    浏览(19)
  • 企业级数据仓库-理论知识

    大数据中间件 Hive:将SQL转化成分布式Map/Reduce进行运算,也支持转换成Spark,需要单独安装Hive集群才能访问Spark,支持60%的SQL,延迟比较大。 SparkSQL:属于Spark生态圈,Hive on Sqark。 HBase: NoSQL,高并发读,适合表结构频繁变动。 Impala: MPP架构的数据查询引擎,低层兼容Hive、Spark SQL,

    2024年02月07日
    浏览(25)
  • 企业级大数据体系结构

    作者:禅与计算机程序设计艺术 企业级大数据是指超大规模数据的集合,是管理者、分析师、决策者所需要分析和处理的一种信息资源。基于海量数据的复杂性及其多样性,实现数据可视化、数据挖掘、机器学习等数据处理功能的大数据平台也逐渐成为行业关注热点。因此,

    2024年02月06日
    浏览(23)
  • 从零实战本地服务器安装 Docker 部署 ThingsBoard PE 专业版(适用于 Postgres + Kafka 企业级生产环境)

    目录 1、准备工作 2、本地服务器Linux Centos7.9系统安装docker 2.1、检查Linux的内核版本 2.2、卸载Docker旧版本(若有需要) 2.3、安装Docker 2.4、安装Docker引擎 2.5、 启动docker和设置开机⾃启动 3、使用 Docker 安装 ThingsBoard PE 3.1、 拉取 ThingsBoard PE 镜像 3.2、获取许可证密钥 3.3、为

    2024年04月22日
    浏览(26)
  • 阿里云大数据——搭建企业级数据分析平台

    目录 1、数据分析介绍 1.1数据分析基本介绍 1.2数据分析目的 1.3数据分析平台组成部分 1.4数据分析平台对应的场景 2、阿里云数加介绍 2.1数加是什么? 2.2“数加”是阿里云大数据的核心能力 2.3数加平台发展历程 3、常用的数加产品 3.1数加产品框架图 3.2数据分析平台常用的数

    2023年04月11日
    浏览(65)
  • 企业级大数据安全架构(四)Ranger安装

    作者:楼高 Ranger是支持审计功能的,安装时可以选择审计数据保存的位置,默认支持Solr和HDFS。HDFS的配置比较简单,这里就不赘述了,我们这里使用Ambari默认自带的Solr保存审计日志,下面部署Solr: 选择Infra Solr add service 在ambari中solr是Infra Solr 可以导出配置信息 更多技术信息

    2024年01月20日
    浏览(20)
  • 大数据开发之Hive(企业级调优)

    创建测试用例 1、建大表、小表和JOIN后表的语句 2、分别向大表和小表中导入数据 1、基本语法 2、实例操作 1)查看下面这条语句的执行计划 (1)没有生成MR任务的 (2)有生成MR任务的 2)查看详细执行计划 10.2.1 列裁剪和分区裁剪 在生产环境中,会面临列很多或者数据量很

    2024年01月18日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包