diff --git a/functions-framework-api/pom.xml b/functions-framework-api/pom.xml index 167edba8..f81be26a 100644 --- a/functions-framework-api/pom.xml +++ b/functions-framework-api/pom.xml @@ -32,7 +32,7 @@ dev.openfunction.functions functions-framework-api - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT UTF-8 diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java index d65a8d87..d091d255 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java @@ -75,13 +75,6 @@ public interface Context { */ String getName(); - /** - * getMode returns the operating environment mode of the function. - * - * @return Mode - */ - String getMode(); - /** * GetOut returns the returned value of function. * @@ -89,13 +82,6 @@ public interface Context { */ Out getOut(); - /** - * getRuntime returns the Runtime. - * - * @return String - */ - String getRuntime(); - /** * getHttpPattern returns the path of the server listening for http function. * @@ -103,42 +89,10 @@ public interface Context { */ String getHttpPattern(); - - /** - * getInputs returns the Inputs of function. - * - * @return Inputs - */ - Map getInputs(); - /** * getOutputs returns the Outputs of function. * * @return Outputs */ Map getOutputs(); - - /** - * getPodName returns the name of the pod the function is running on. - * - * @return name of pod - */ - String getPodName(); - - /** - * getPodNamespace returns the namespace of the pod the function is running on. - * - * @return namespace of pod - */ - String getPodNamespace(); - - /** - * @return pre plugins - */ - Map getPrePlugins(); - - /** - * @return post plugin - */ - Map getPostPlugins(); } diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java index de9380a5..d19b3bf5 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java @@ -16,6 +16,8 @@ package dev.openfunction.functions; +import java.util.Map; + public interface Plugin { /** * name return the name of this plugin. @@ -33,7 +35,7 @@ public interface Plugin { /** * init will create a new plugin, and execute hook in this calling. - * If you do not want to use a new plugin to execute hook, just return `nil`. + * If you do not want to use a new plugin to execute hook, just return `this`. * * @return Plugin */ @@ -63,4 +65,9 @@ public interface Plugin { * @return Object */ Object getField(String fieldName); + + Boolean needToTracing(); + + Map tagsAddToTracing(); + } diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/TopicEvent.java b/functions-framework-api/src/main/java/dev/openfunction/functions/TopicEvent.java index 9191c0a8..3bbb745a 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/TopicEvent.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/TopicEvent.java @@ -17,6 +17,7 @@ package dev.openfunction.functions; import java.nio.ByteBuffer; +import java.util.Map; public class TopicEvent { /** @@ -60,7 +61,9 @@ public class TopicEvent { */ private final String topic; - public TopicEvent(String name, String id, String topic, String specversion, String source, String type, String datacontenttype, ByteBuffer data) { + private final Map extensions; + + public TopicEvent(String name, String id, String topic, String specversion, String source, String type, String datacontenttype, ByteBuffer data, Map extensions) { this.name = name; this.id = id; this.topic = topic; @@ -69,6 +72,7 @@ public TopicEvent(String name, String id, String topic, String specversion, Stri this.type = type; this.datacontenttype = datacontenttype; this.data = data; + this.extensions = extensions; } public String getName() { @@ -102,4 +106,8 @@ public ByteBuffer getData() { public String getTopic() { return topic; } + + public Map getExtensions() { + return this.extensions; + } } diff --git a/functions-framework-invoker/pom.xml b/functions-framework-invoker/pom.xml index e059380f..ddeb6830 100644 --- a/functions-framework-invoker/pom.xml +++ b/functions-framework-invoker/pom.xml @@ -10,7 +10,7 @@ dev.openfunction.functions functions-framework-invoker - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT 3.8.0 @@ -19,7 +19,7 @@ 5.3.2 11 11 - 2.3.0 + 2.4.2 @@ -44,11 +44,23 @@ + + + + io.opentelemetry + opentelemetry-bom + 1.23.1 + pom + import + + + + dev.openfunction.functions functions-framework-api - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT io.cloudevents @@ -63,7 +75,7 @@ com.google.code.gson gson - 2.9.0 + 2.10.1 com.ryanharter.auto.value @@ -80,50 +92,104 @@ com.google.auto.value auto-value - 1.9 + 1.10.1 provided com.google.auto.value auto-value-annotations - 1.9 + 1.10.1 provided org.eclipse.jetty jetty-servlet - 11.0.9 + 11.0.14 org.eclipse.jetty jetty-server - 11.0.9 + 11.0.14 io.dapr dapr-sdk - 1.5.0 + 1.8.0 io.dapr dapr-sdk-actors - 1.5.0 + 1.8.0 io.cloudevents cloudevents-http-basic - 2.3.0 + ${cloudevents.sdk.version} org.glassfish.jersey.core jersey-common - 3.0.5 + 3.1.1 org.slf4j slf4j-simple - 2.0.0-alpha6 + 2.0.5 + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + 1.23.0 + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-common + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + 1.23.1-alpha + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.opentelemetry + opentelemetry-exporter-zipkin + + + io.opentelemetry + opentelemetry-exporter-jaeger + + + io.opentelemetry + opentelemetry-exporter-jaeger-proto + + + io.opentelemetry + opentelemetry-exporter-jaeger-thrift + + + io.opentelemetry + opentelemetry-semconv + 1.23.1-alpha + + + + commons-beanutils + commons-beanutils + 1.9.4 diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Callback.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Callback.java new file mode 100644 index 00000000..fb1f59d8 --- /dev/null +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Callback.java @@ -0,0 +1,5 @@ +package dev.openfunction.invoker; + +public interface Callback { + Error execute() throws Exception; +} diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java index 6e60aadc..eaa6321e 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java @@ -44,8 +44,6 @@ public class Runner { private static final String FunctionContext = "FUNC_CONTEXT"; private static final String FunctionTarget = "FUNCTION_TARGET"; private static final String FunctionClasspath = "FUNCTION_CLASSPATH"; - private static final String SyncRuntime = "Knative"; - private static final String AsyncRuntime = "Async"; public static void main(String[] args) { @@ -66,9 +64,9 @@ public static void main(String[] args) { Runtime runtime; Class[] functionClasses = loadTargets(target, functionClassLoader); - if (Objects.equals(runtimeContext.getRuntime(), SyncRuntime)) { + if (Objects.equals(runtimeContext.getRuntime(), RuntimeContext.SyncRuntime)) { runtime = new SynchronizeRuntime(runtimeContext, functionClasses); - } else if (Objects.equals(runtimeContext.getRuntime(), AsyncRuntime)) { + } else if (Objects.equals(runtimeContext.getRuntime(), RuntimeContext.AsyncRuntime)) { runtime = new AsynchronousRuntime(runtimeContext, functionClasses); } else { throw new Exception("Unknown runtime"); diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java index 49b7130c..8561bf46 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java @@ -17,9 +17,7 @@ package dev.openfunction.invoker.context; import dev.openfunction.functions.Component; -import kotlin.Pair; -import java.util.HashMap; import java.util.Map; class FunctionContext { @@ -107,84 +105,7 @@ public void setPluginsTracing(TracingConfig pluginsTracing) { this.pluginsTracing = pluginsTracing; } - class TracingConfig { - - private boolean enabled; - private TracingProvider provider; - private Map tags; - private Map baggage; - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - public TracingProvider getProvider() { - return provider; - } - - public void setProvider(TracingProvider provider) { - this.provider = provider; - } - - public Map getTags() { - return tags; - } - - public void setTags(Map tags) { - this.tags = tags; - } - - public Map getBaggage() { - return baggage; - } - - public void setBaggage(Map baggage) { - this.baggage = baggage; - } - - private class TracingProvider { - private String Name; - private String oapServer; - - public String getName() { - return Name; - } - - public void setName(String name) { - Name = name; - } - - public String getOapServer() { - return oapServer; - } - - public void setOapServer(String oapServer) { - this.oapServer = oapServer; - } - } - } - public boolean isTracingEnabled() { - return pluginsTracing != null && pluginsTracing.isEnabled(); } - - public Pair getTracingProvider() { - if (pluginsTracing.provider == null) { - return new Pair<>("", ""); - } - return new Pair<>(getPluginsTracing().provider.getName(), getPluginsTracing().provider.getOapServer()); - } - - public Map getTracingTags() { - return pluginsTracing == null ? new HashMap<>() : getPluginsTracing().getTags(); - } - - public Map getTracingBaggage() { - return pluginsTracing == null ? new HashMap<>() : getPluginsTracing().getBaggage(); - } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java index 988a50d2..807a6d22 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java @@ -18,11 +18,14 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import dev.openfunction.functions.Component; -import dev.openfunction.functions.Plugin; -import dev.openfunction.invoker.plugins.SkywalkingPlugin; -import kotlin.Pair; -import org.eclipse.jetty.util.ArrayUtil; +import dev.openfunction.functions.*; +import dev.openfunction.invoker.Callback; +import dev.openfunction.invoker.runtime.JsonEventFormat; +import dev.openfunction.invoker.tracing.OpenTelemetryProvider; +import dev.openfunction.invoker.tracing.SkywalkingProvider; +import dev.openfunction.invoker.tracing.TracingProvider; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; import java.util.HashMap; import java.util.Map; @@ -35,26 +38,22 @@ public class RuntimeContext { private static final Logger logger = Logger.getLogger("dev.openfunction.invoker"); private static final Gson GSON = new GsonBuilder().serializeNulls().create(); - private static final String ModeEnvName = "CONTEXT_MODE"; - private static final String KubernetesMode = "kubernetes"; - private static final String SelfHostMode = "self-host"; - private static final String PodNameEnvName = "POD_NAME"; - private static final String PodNamespaceEnvName = "POD_NAMESPACE"; + static final String PodNameEnvName = "POD_NAME"; + static final String PodNamespaceEnvName = "POD_NAMESPACE"; + public static final String SyncRuntime = "Knative"; + public static final String AsyncRuntime = "Async"; + private static final String TracingSkywalking = "skywalking"; private static final String TracingOpentelemetry = "opentelemetry"; private final FunctionContext functionContext; - private String mode; private final int port; - private String pod; - private String namespace; private Map prePlugins; private Map postPlugins; - private Plugin tracingPlugin; + private TracingProvider tracingProvider; public RuntimeContext(String context, ClassLoader classLoader) throws Exception { - functionContext = GSON.getAdapter(FunctionContext.class).fromJson(context); prePlugins = new HashMap<>(); @@ -62,64 +61,41 @@ public RuntimeContext(String context, ClassLoader classLoader) throws Exception port = Integer.parseInt(functionContext.getPort()); - mode = System.getenv(ModeEnvName); - if (!Objects.equals(mode, SelfHostMode)) { - mode = KubernetesMode; - } + loadPlugins(classLoader); - if (mode.equals(KubernetesMode)) { - pod = System.getenv(PodNameEnvName); - if (pod == null || pod.isEmpty()) { - throw new Error("environment variable `POD_NAME` not found"); + if (functionContext.isTracingEnabled() && functionContext.getPluginsTracing().getProvider() != null) { + String provider = functionContext.getPluginsTracing().getProvider().getName(); + if (!Objects.equals(provider, TracingSkywalking) && !Objects.equals(provider, TracingOpentelemetry)) { + throw new IllegalArgumentException("unsupported tracing provider " + provider); } - namespace = System.getenv(PodNamespaceEnvName); - if (pod == null || pod.isEmpty()) { - throw new Error("environment variable `POD_NAMESPACE` not found"); + switch (provider) { + case TracingSkywalking: + tracingProvider = new SkywalkingProvider(); + case TracingOpentelemetry: + tracingProvider = new OpenTelemetryProvider(functionContext.getPluginsTracing(), + getName(), + System.getenv(RuntimeContext.PodNameEnvName), + System.getenv(RuntimeContext.PodNamespaceEnvName)); } } - loadPlugins(classLoader); + EventFormatProvider.getInstance().registerFormat(new JsonEventFormat()); } private void loadPlugins(ClassLoader classLoader) { - - String[] prePluginNames = ArrayUtil.add(functionContext.getPrePlugins(), null); - String[] postPluginNames = ArrayUtil.add(functionContext.getPostPlugins(), null); - - if (functionContext.isTracingEnabled()) { - - Pair provider = functionContext.getTracingProvider(); - String providerName = provider.component1(); - if (!Objects.equals(providerName, TracingSkywalking) && !Objects.equals(providerName, TracingOpentelemetry)) { - throw new IllegalArgumentException("unknown tracing provider " + provider); - } - - prePluginNames = ArrayUtil.addToArray(prePluginNames, providerName, String.class); - postPluginNames = ArrayUtil.addToArray(postPluginNames, providerName, String.class); - - Map tags = functionContext.getTracingTags(); - tags.put("func", functionContext.getName()); - tags.put("instance", pod); - tags.put("namespace", namespace); - - tracingPlugin = new SkywalkingPlugin(provider.component2(), tags, functionContext.getTracingBaggage()); - } - - prePlugins = loadPlugins(classLoader, prePluginNames); - postPlugins = loadPlugins(classLoader, postPluginNames); + prePlugins = loadPlugins(classLoader, functionContext.getPrePlugins()); + postPlugins = loadPlugins(classLoader, functionContext.getPostPlugins()); } private Map loadPlugins(ClassLoader classLoader, String[] pluginNames) { Map plugins = new HashMap<>(); - if (pluginNames == null) { return plugins; } for (String name : pluginNames) { - if (Objects.equals(name, TracingSkywalking)) { - plugins.put(TracingSkywalking, tracingPlugin); + if (Objects.equals(name, TracingOpentelemetry) || Objects.equals(name, TracingSkywalking)) { continue; } @@ -128,7 +104,7 @@ private Map loadPlugins(ClassLoader classLoader, String[] plugin Class pluginImplClass = pluginClass.asSubclass(Plugin.class); plugins.put(name, pluginImplClass.getConstructor().newInstance()); } catch (Exception e) { - logger.log(Level.WARNING, "load plugin " + name +" error, " + e.getMessage()); + logger.log(Level.WARNING, "load plugin " + name + " error, " + e.getMessage()); e.printStackTrace(); } } @@ -164,15 +140,32 @@ public Map getOutputs() { return functionContext.getOutputs(); } - public String getMode() { - return mode; + public TracingProvider getTracingProvider() { + return tracingProvider; } - public String getPod() { - return pod; - } - - public String getNamespace() { - return namespace; + public void executeWithTracing(Object obj, Callback callback) throws Exception { + if (tracingProvider != null) { + if (obj == null) { + tracingProvider.executeWithTracing(callback); + } else if (obj instanceof HttpRequest) { + tracingProvider.executeWithTracing((HttpRequest) obj, callback); + } else if (obj instanceof CloudEvent) { + tracingProvider.executeWithTracing((CloudEvent) obj, callback); + } else if (obj.getClass().isAssignableFrom(TopicEvent.class)) { + tracingProvider.executeWithTracing((TopicEvent) obj, callback); + } else if (obj.getClass().isAssignableFrom(BindingEvent.class)) { + tracingProvider.executeWithTracing((BindingEvent) obj, callback); + } else if (obj.getClass().isAssignableFrom(UserContext.class)) { + tracingProvider.executeWithTracing((UserContext) obj, callback); + } else if (obj instanceof Plugin) { + tracingProvider.executeWithTracing((Plugin) obj, callback); + } + } else { + Error error = callback.execute(); + if (error != null) { + logger.log(Level.WARNING, "execute failed, ", error); + } + } } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/TracingConfig.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/TracingConfig.java new file mode 100644 index 00000000..b015a5cf --- /dev/null +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/TracingConfig.java @@ -0,0 +1,131 @@ +package dev.openfunction.invoker.context; + +import java.time.Duration; +import java.util.Map; + +public class TracingConfig { + private boolean enabled; + private Provider provider; + private Map tags; + private Map baggage; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Provider getProvider() { + return provider; + } + + public void setProvider(Provider provider) { + this.provider = provider; + } + + public Map getTags() { + return tags; + } + + public void setTags(Map tags) { + this.tags = tags; + } + + public Map getBaggage() { + return baggage; + } + + public void setBaggage(Map baggage) { + this.baggage = baggage; + } + + public static class Provider { + private String name; + private String oapServer; + private Exporter exporter; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Exporter getExporter() { + return exporter; + } + + public void setExporter(Exporter exporter) { + this.exporter = exporter; + } + + public String getOapServer() { + return oapServer; + } + + public void setOapServer(String oapServer) { + this.oapServer = oapServer; + } + } + + public static class Exporter { + private String name; + private String endpoint; + private Map headers; + private String compression; + private Duration timeout; + private String protocol; + + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public String getCompression() { + return compression; + } + + public void setCompression(String compression) { + this.compression = compression; + } + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + public String getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java index 72c120c8..d0f39e11 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java @@ -17,11 +17,15 @@ package dev.openfunction.invoker.context; import dev.openfunction.functions.*; +import dev.openfunction.invoker.Callback; +import dev.openfunction.invoker.runtime.JsonEventFormat; import io.cloudevents.CloudEvent; +import io.cloudevents.core.v03.CloudEventBuilder; import io.dapr.client.DaprClient; +import jakarta.servlet.http.HttpServletResponse; -import java.util.HashMap; -import java.util.Map; +import java.net.URI; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,26 +33,22 @@ public class UserContext implements Context { private static final Logger logger = Logger.getLogger("dev.openfunction.invoker"); - private static final Map bindingQueueComponents = Map.of( - "bindings.kafka", true, - "bindings.rabbitmq", true, - "bindings.aws.sqs", true, - "bindings.aws.kinesis", true, - "bindings.gcp.pubsub", true, - "bindings.azure.eventgrid", true, - "bindings.azure.eventhubs", true, - "bindings.azure.servicebusqueues", true, - "bindings.azure.storagequeues", true - ); - public static final String OpenFuncBinding = "bindings"; public static final String OpenFuncTopic = "pubsub"; - private RuntimeContext runtimeContext; - private DaprClient daprClient; + private static final Set MiddlewaresCloudEventFormatReqired = Set.of( + "kafka", + "kubemq", + "mqtt3", + "rabbitmq", + "redis", + "gcp.pubsub", + "azure.eventhubs" + ); + + private final RuntimeContext runtimeContext; + private final DaprClient daprClient; - private Map prePlugins; - private Map postPlugins; private Out out; private BindingEvent bindingEvent; @@ -58,43 +58,34 @@ public class UserContext implements Context { private HttpRequest httpRequest; private HttpResponse httpResponse; - public UserContext(RuntimeContext runtimeContext, DaprClient daprClient, BindingEvent event) { - init(runtimeContext, daprClient); - this.bindingEvent = event; - } + private Object function; - public UserContext(RuntimeContext runtimeContext, DaprClient daprClient, TopicEvent event) { - init(runtimeContext, daprClient); - this.topicEvent = event; + public UserContext(RuntimeContext runtimeContext, DaprClient daprClient) { + this.runtimeContext = runtimeContext; + this.daprClient = daprClient; } - public UserContext(RuntimeContext runtimeContext, DaprClient daprClient, HttpRequest httpRequest, HttpResponse httpResponse) { - init(runtimeContext, daprClient); + public UserContext withHttp(HttpRequest httpRequest, HttpResponse httpResponse) { this.httpRequest = httpRequest; this.httpResponse = httpResponse; + return this; } - private void init(RuntimeContext runtimeContext, DaprClient daprClient) { - this.runtimeContext = runtimeContext; - - Map plugins = new HashMap<>(); - for (String name : runtimeContext.getPrePlugins().keySet()) { - plugins.put(name, (runtimeContext.getPrePlugins().get(name).init())); - } - prePlugins = plugins; - - plugins = new HashMap<>(); - for (String name : runtimeContext.getPostPlugins().keySet()) { - plugins.put(name, (runtimeContext.getPostPlugins().get(name).init())); - } - postPlugins = plugins; + public UserContext withBindingEvent(BindingEvent event) { + this.bindingEvent = event; + return this; + } - this.daprClient = daprClient; + public UserContext withTopicEvent(TopicEvent event) { + this.topicEvent = event; + return this; } @Override public Error send(String outputName, String data) { - + if (data == null) { + return null; + } Map outputs = runtimeContext.getOutputs(); if (outputs.isEmpty()) { return new Error("no output"); @@ -105,39 +96,32 @@ public Error send(String outputName, String data) { return new Error("output " + outputName + " not found"); } - String payload = data; - // Convert queue binding event into cloud event format to add tracing metadata in the cloud event context. - if (isTraceable(output.getComponentType())) { - } - if (output.getComponentType().startsWith(OpenFuncTopic)) { - daprClient.publishEvent(output.getComponentName(), output.getUri(), payload); + daprClient.publishEvent(output.getComponentName(), output.getUri(), data); } else if (output.getComponentType().startsWith(OpenFuncBinding)) { - daprClient.invokeBinding(output.getComponentName(), output.getOperation(), payload.getBytes(), output.getMetadata()).block(); + // If a middleware supports both binding and pubsub, then the data send to + // binding must be in CloudEvent format, otherwise pubsub cannot parse the data. + byte[] payload = data.getBytes(); + if (MiddlewaresCloudEventFormatReqired.contains(output.getComponentType().substring(OpenFuncBinding.length() + 1))) { + CloudEvent event = new CloudEventBuilder() + .withId(UUID.randomUUID().toString()) + .withType("dapr.invoke") + .withSource(URI.create("openfunction/invokeBinding")) + .withData(data.getBytes()) + .withDataContentType(JsonEventFormat.CONTENT_TYPE) + .withSubject(output.getUri()) + .build(); + payload = new JsonEventFormat().serialize(event); + } + + daprClient.invokeBinding(output.getComponentName(), output.getOperation(), payload).block(); } else { - return new Error("unknown output type " + output.getComponentType()); + return new Error("unsupported output type " + output.getComponentType()); } return null; } - /** - * isTraceable Convert queue binding event into cloud event format to add tracing metadata in the cloud event context. - * - * @param t output type - * @return Boolean - */ - private Boolean isTraceable(String t) { - - if (t.startsWith("pubsub")) { - return true; - } - - // For dapr binding components, let the mapping conditions of the bindingQueueComponents - // determine if the tracing metadata can be added. - return bindingQueueComponents.get(t); - } - @Override public HttpRequest getHttpRequest() { return httpRequest; @@ -168,82 +152,120 @@ public String getName() { return runtimeContext.getName(); } - @Override - public String getMode() { - return runtimeContext.getMode(); - } - @Override public Out getOut() { return out; } - @Override - public String getRuntime() { - return runtimeContext.getRuntime(); - } - @Override public String getHttpPattern() { return null; } - @Override - public Map getInputs() { - return runtimeContext.getInputs(); - } - @Override public Map getOutputs() { return runtimeContext.getOutputs(); } - @Override - public String getPodName() { - return runtimeContext.getPod(); + public Map getInputs() { + return runtimeContext.getInputs(); } - @Override - public String getPodNamespace() { - return runtimeContext.getNamespace(); + public Class getFunctionClass() { + return function.getClass(); } - @Override - public Map getPrePlugins() { - return prePlugins; + private void executePrePlugins() throws Exception { + for (String name : runtimeContext.getPrePlugins().keySet()) { + Plugin plugin = runtimeContext.getPrePlugins().get(name).init(); + if (plugin.needToTracing()) { + runtimeContext.executeWithTracing(plugin, () -> { + Error error = plugin.execPreHook(UserContext.this); + if (error != null) { + logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + } + + return error; + }); + } else { + Error error = plugin.execPreHook(UserContext.this); + if (error != null) { + logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + } + } + } } - @Override - public Map getPostPlugins() { - return postPlugins; + private void executePostPlugins() throws Exception { + for (String name : runtimeContext.getPostPlugins().keySet()) { + Plugin plugin = runtimeContext.getPostPlugins().get(name).init(); + if (plugin.needToTracing()) { + runtimeContext.executeWithTracing(plugin, () -> { + Error error = plugin.execPostHook(UserContext.this); + if (error != null) { + logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + } + + return error; + }); + } else { + Error error = plugin.execPostHook(UserContext.this); + if (error != null) { + logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + } + } + } } - public void setCloudEvent(CloudEvent cloudEvent) { - this.cloudEvent = cloudEvent; + public void executeFunction(HttpFunction function) throws Exception { + this.function = function; + executeFunction(() -> { + function.service(this.httpRequest, this.httpResponse); + return null; + }); } - public void setOut(Out out) { - this.out = out; + public void executeFunction(CloudEventFunction function, CloudEvent event) throws Exception { + this.function = function; + this.cloudEvent = event; + executeFunction(() -> { + Error err = function.accept(UserContext.this, event); + if (err == null) { + httpResponse.setStatusCode(HttpServletResponse.SC_OK); + httpResponse.getOutputStream().write("Success".getBytes()); + } else { + httpResponse.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + httpResponse.getOutputStream().write(err.getMessage().getBytes()); + } + return null; + }); } - public void executePrePlugins() { - - for (String name : getPrePlugins().keySet()) { - Plugin plugin = getPrePlugins().get(name); - Error error = plugin.execPreHook(this); - if (error != null) { - logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + public void executeFunction(OpenFunction function, String payload) throws Exception { + this.function = function; + executeFunction(() -> { + out = function.accept(UserContext.this, payload); + if (httpResponse != null) { + if (out == null || out.getError() == null) { + httpResponse.setStatusCode(HttpServletResponse.SC_OK); + httpResponse.getOutputStream().write("Success".getBytes()); + } else { + httpResponse.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + httpResponse.getOutputStream().write(out.getError().getMessage().getBytes()); + } } - } + + return out == null ? null : out.getError(); + }); } - public void executePostPlugins() { - for (String name : getPostPlugins().keySet()) { - Plugin plugin = getPostPlugins().get(name); - Error error = plugin.execPostHook(this); - if (error != null) { - logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); - } - } + private void executeFunction(Callback callBack) throws Exception { + runtimeContext.executeWithTracing(this, + () -> { + executePrePlugins(); + runtimeContext.executeWithTracing(null, callBack); + executePostPlugins(); + return null; + }); } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/plugins/SkywalkingPlugin.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/plugins/SkywalkingPlugin.java deleted file mode 100644 index 83b8957c..00000000 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/plugins/SkywalkingPlugin.java +++ /dev/null @@ -1,73 +0,0 @@ -/* -Copyright 2022 The OpenFunction Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dev.openfunction.invoker.plugins; - -import dev.openfunction.functions.Context; -import dev.openfunction.functions.Plugin; - -import java.util.Map; - -public class SkywalkingPlugin implements Plugin { - - private static final String Name = "skywalking"; - private static final String Version = "v1"; - - /** - * ... - */ - private static final int componentIDOpenFunction = 5013; - - private String oapServer; - private Map tags; - private Map baggage; - - public SkywalkingPlugin(String oapServer, Map tags, Map baggage) { - this.oapServer = oapServer; - this.tags = tags; - this.baggage = baggage; - } - - @Override - public String name() { - return Name; - } - - @Override - public String version() { - return Version; - } - - @Override - public Plugin init() { - return this; - } - - @Override - public Error execPreHook(Context ctx) { - return null; - } - - @Override - public Error execPostHook(Context ctx) { - return null; - } - - @Override - public Object getField(String fieldName) { - return null; - } -} diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java index cc5d81fd..c2eaf133 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java @@ -16,7 +16,11 @@ package dev.openfunction.invoker.runtime; -import dev.openfunction.functions.*; +import com.google.protobuf.Value; +import dev.openfunction.functions.BindingEvent; +import dev.openfunction.functions.Component; +import dev.openfunction.functions.OpenFunction; +import dev.openfunction.functions.TopicEvent; import dev.openfunction.invoker.context.RuntimeContext; import dev.openfunction.invoker.context.UserContext; import io.dapr.client.DaprClient; @@ -27,9 +31,7 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -127,24 +129,24 @@ public void listInputBindings(com.google.protobuf.Empty request, @Override public void onBindingEvent(DaprAppCallbackProtos.BindingEventRequest request, StreamObserver responseObserver) { - try { - for (OpenFunction function : functions) { - BindingEvent event = new BindingEvent(request.getName(), request.getMetadataMap(), request.getData().asReadOnlyByteBuffer()); - UserContext userContext = new UserContext(runtimeContext, daprClient, event); - - userContext.executePrePlugins(); - Out out = function.accept(userContext, request.getData().toStringUtf8()); - userContext.setOut(out); - userContext.executePostPlugins(); + BindingEvent event = new BindingEvent(request.getName(), request.getMetadataMap(), request.getData().asReadOnlyByteBuffer()); - responseObserver.onNext(DaprAppCallbackProtos.BindingEventResponse.getDefaultInstance()); - } + try { + runtimeContext.executeWithTracing(event, () -> { + for (OpenFunction function : functions) { + new UserContext(runtimeContext, daprClient). + withBindingEvent(event). + executeFunction(function, request.getData().toStringUtf8()); + } + responseObserver.onNext(DaprAppCallbackProtos.BindingEventResponse.getDefaultInstance()); + responseObserver.onCompleted(); + return null; + } + ); } catch (Exception e) { logger.log(Level.INFO, "catch exception when execute function " + runtimeContext.getName()); e.printStackTrace(); responseObserver.onError(e); - } finally { - responseObserver.onCompleted(); } } @@ -163,33 +165,48 @@ public void listTopicSubscriptions(com.google.protobuf.Empty request, } @Override - public void onTopicEvent(io.dapr.v1.DaprAppCallbackProtos.TopicEventRequest request, + public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, io.grpc.stub.StreamObserver responseObserver) { + TopicEvent event = new TopicEvent(request.getPubsubName(), + request.getId(), + request.getTopic(), + request.getSpecVersion(), + request.getSource(), + request.getType(), + request.getDataContentType(), + request.getData().asReadOnlyByteBuffer(), + getExtensions(request)); + try { - for (OpenFunction function: functions) { - TopicEvent event = new TopicEvent(request.getPubsubName(), - request.getId(), - request.getTopic(), - request.getSpecVersion(), - request.getSource(), - request.getType(), - request.getDataContentType(), - request.getData().asReadOnlyByteBuffer()); - UserContext userContext = new UserContext(runtimeContext, daprClient, event); - userContext.executePrePlugins(); - Out out = function.accept(userContext, request.getData().toStringUtf8()); - userContext.setOut(out); - userContext.executePostPlugins(); - - responseObserver.onNext(DaprAppCallbackProtos.TopicEventResponse.getDefaultInstance()); - } + runtimeContext.executeWithTracing(event, () -> { + for (OpenFunction function : functions) { + new UserContext(runtimeContext, daprClient). + withTopicEvent(event). + executeFunction(function, request.getData().toStringUtf8()); + } + responseObserver.onNext(DaprAppCallbackProtos.TopicEventResponse.getDefaultInstance()); + responseObserver.onCompleted(); + return null; + } + ); } catch (Exception e) { logger.log(Level.INFO, "catch exception when execute function " + runtimeContext.getName()); e.printStackTrace(); responseObserver.onError(e); - } finally { - responseObserver.onCompleted(); } } } + + private Map getExtensions(DaprAppCallbackProtos.TopicEventRequest request) { + Map extensions = new HashMap<>(); + Map fields = request.getExtensions().getFieldsMap(); + for (String key : fields.keySet()) { + Value value = fields.get(key); + if (value.hasStringValue()) { + extensions.put(key, value.getStringValue()); + } + } + + return extensions; + } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java index 0bc4ace2..95207bfd 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java @@ -18,6 +18,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; @@ -25,6 +26,7 @@ import io.cloudevents.core.format.EventDeserializationException; import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.format.EventSerializationException; +import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; import io.cloudevents.rw.CloudEventDataMapper; import org.jetbrains.annotations.NotNull; @@ -34,16 +36,29 @@ import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class JsonEventFormat implements EventFormat { - private static final String CONTENT_TYPE = "application/cloudevents+json"; + public static final String CONTENT_TYPE = "application/cloudevents+json"; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); @Override public byte[] serialize(@NotNull CloudEvent event) throws EventSerializationException { - return GSON.toJson(event).getBytes(); + JsonObject root = GSON.toJsonTree(event).getAsJsonObject(); + for (String key : event.getExtensionNames()) { + root.add(key, GSON.toJsonTree(event.getExtension(key))); + } + + if (root.get("traceparent") != null) { + String traceparent = root.get("traceparent").getAsString(); + if (!Objects.equals(traceparent, "")) { + root.add("traceid", GSON.toJsonTree(traceparent)); + } + } + + return root.toString().getBytes(); } @Override @@ -56,44 +71,55 @@ public CloudEvent deserialize(@NotNull byte[] bytes, @NotNull CloudEventDataMapp URI source = null; String type = null; String datacontenttype = null; - URI dataschema = null; + URI schemaurl = null; String subject = null; OffsetDateTime time = null; BytesCloudEventData data = null; Map extensions = new HashMap<>(); for (String key : jsonObject.keySet()) { + JsonElement element = jsonObject.get(key); + if (element.isJsonNull()) { + continue; + } + switch (key) { - case CloudEventV1.ID: - id = jsonObject.get(key).getAsString(); + case CloudEventV03.ID: + id = element.getAsString(); break; - case CloudEventV1.SOURCE: - source = new URI(jsonObject.get(key).getAsString()); + case CloudEventV03.SOURCE: + source = new URI(element.getAsString()); break; - case CloudEventV1.TYPE: - type = jsonObject.get(key).getAsString(); + case CloudEventV03.TYPE: + type = element.getAsString(); break; - case CloudEventV1.DATACONTENTTYPE: - datacontenttype = jsonObject.get(key).getAsString(); + case CloudEventV03.DATACONTENTTYPE: + datacontenttype = element.getAsString(); break; - case CloudEventV1.DATASCHEMA: - dataschema = new URI(jsonObject.get(key).getAsString()); + case CloudEventV03.SCHEMAURL: + schemaurl = new URI(element.getAsString()); break; case CloudEventV1.SUBJECT: - subject = jsonObject.get(key).getAsString(); + subject = element.getAsString(); break; case CloudEventV1.TIME: - time = OffsetDateTime.parse(jsonObject.get(key).getAsString()); + time = OffsetDateTime.parse(element.getAsString()); break; case "data": - data = BytesCloudEventData.wrap(jsonObject.get(key).getAsString().getBytes()); + data = BytesCloudEventData.wrap(element.getAsString().getBytes()); + break; + case "extensions": + JsonObject extensionsObject= element.getAsJsonObject(); + for (String extensionKey : extensionsObject.keySet()) { + extensions.put(extensionKey, extensionsObject.get(extensionKey).getAsString()); + } break; default: - extensions.put(key, jsonObject.get(key).getAsString()); + extensions.put(key, element.getAsString()); break; } } - return new CloudEventV1(id, source, type, datacontenttype, dataschema, subject, time, data, extensions); + return new CloudEventV03(id, source, type, time, schemaurl, datacontenttype, subject, data, extensions); } catch (Exception e) { throw new EventDeserializationException(e); } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java index f6f4767a..e2369f65 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java @@ -16,14 +16,16 @@ package dev.openfunction.invoker.runtime; -import dev.openfunction.functions.*; +import dev.openfunction.functions.CloudEventFunction; +import dev.openfunction.functions.HttpFunction; +import dev.openfunction.functions.OpenFunction; +import dev.openfunction.functions.Routable; import dev.openfunction.invoker.context.RuntimeContext; import dev.openfunction.invoker.context.UserContext; import dev.openfunction.invoker.http.HttpRequestImpl; import dev.openfunction.invoker.http.HttpResponseImpl; import io.cloudevents.CloudEvent; import io.cloudevents.core.message.MessageReader; -import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.http.HttpMessageFactory; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; @@ -35,7 +37,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.logging.Level; @@ -56,13 +57,12 @@ public class SynchronizeRuntime extends HttpServlet implements Runtime { public SynchronizeRuntime(RuntimeContext runtimeContext, Class[] functionClasses) { this.runtimeContext = runtimeContext; this.functionClasses = functionClasses; - EventFormatProvider.getInstance().registerFormat(new JsonEventFormat()); } @Override public void start() throws Exception { - // create dapr client when dapr sidecar enabled. - if (System.getenv("DAPR_GRPC_PORT") != null || System.getenv("DAPR_HTTP_PORT") != null) { + if ((runtimeContext.getInputs() != null && !runtimeContext.getInputs().isEmpty()) || + (runtimeContext.getOutputs() != null && !runtimeContext.getOutputs().isEmpty())) { daprClient = new DaprClientBuilder().build(); daprClient.waitForSidecar(Runtime.WaitDaprSidecarTimeout); } @@ -124,64 +124,27 @@ public void service(HttpServletRequest req, HttpServletResponse res) { } } - UserContext userContext = new UserContext(runtimeContext, daprClient, reqImpl, respImpl); + UserContext userContext = new UserContext(runtimeContext, daprClient). + withHttp(reqImpl, respImpl); if (HttpFunction.class.isAssignableFrom(function.getClass())) { - userContext.executePrePlugins(); - ((HttpFunction) function).service(reqImpl, respImpl); - - if (userContext.getOut() == null) { - userContext.setOut(new Out().setCode(respImpl.getStatusCode())); - } - - userContext.executePostPlugins(); + runtimeContext.executeWithTracing(reqImpl, () -> { + userContext.executeFunction(((HttpFunction) function)); + return null; + } + ); } else if (CloudEventFunction.class.isAssignableFrom(function.getClass())) { MessageReader messageReader = HttpMessageFactory.createReaderFromMultimap(reqImpl.getHeaders(), reqImpl.getInputStream().readAllBytes()); CloudEvent event = messageReader.toEvent(); - userContext.setCloudEvent(event); - - userContext.executePrePlugins(); - Error err = ((CloudEventFunction) function).accept(userContext, event); - - if (userContext.getOut() == null) { - userContext.setOut(new Out().setError(err)); - } - - userContext.executePostPlugins(); - if (userContext.getOut() == null) { - userContext.setOut(new Out().setError(err)); - } - if (userContext.getOut().getData() == null) { - userContext.getOut().setData(ByteBuffer.wrap(("Success".getBytes()))); - } - - if (err == null) { - respImpl.setStatusCode(HttpServletResponse.SC_OK); - } else { - respImpl.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } - - respImpl.getOutputStream().write(userContext.getOut().getData().array()); + runtimeContext.executeWithTracing(event, () -> { + userContext.executeFunction((CloudEventFunction) function, event); + return null; + }); } else if (OpenFunction.class.isAssignableFrom(function.getClass())) { - userContext.executePrePlugins(); - Out out = ((OpenFunction) function).accept(userContext, new String(reqImpl.getInputStream().readAllBytes())); - userContext.setOut(out); - userContext.executePostPlugins(); - - if (userContext.getOut() == null) { - userContext.setOut(out); - } - - if (userContext.getOut().getData() == null) { - userContext.getOut().setData(ByteBuffer.wrap(("Success".getBytes()))); - } - - if (out.getError() == null) { - respImpl.setStatusCode(HttpServletResponse.SC_OK); - } else { - respImpl.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } - - respImpl.getOutputStream().write(userContext.getOut().getData().array()); + runtimeContext.executeWithTracing(reqImpl, () -> { + userContext.executeFunction((OpenFunction) function, new String(reqImpl.getInputStream().readAllBytes())); + return null; + } + ); } } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to execute function", t); diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java new file mode 100644 index 00000000..5dccb9a1 --- /dev/null +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java @@ -0,0 +1,292 @@ +package dev.openfunction.invoker.tracing; + +import dev.openfunction.functions.*; +import dev.openfunction.invoker.Callback; +import dev.openfunction.invoker.context.TracingConfig; +import dev.openfunction.invoker.context.UserContext; +import io.cloudevents.CloudEvent; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter; +import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder; +import io.opentelemetry.exporter.jaeger.thrift.JaegerThriftSpanExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; +import io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.jetbrains.annotations.NotNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class OpenTelemetryProvider implements TracingProvider { + private static final String OTEL_LIBRARY_NAME = "opentelemetry-java"; + private static final String OTEL_LIBRARY_VERSION = "1.23.1"; + + private static final String Protocol_HTTP = "http"; + + private final String functionName; + private final Map tags; + private final Map baggage; + private final TextMapGetter> getter; + + public OpenTelemetryProvider(TracingConfig config, String functionName, String pod, String namespace) throws Exception { + this.functionName = functionName; + + OpenTelemetrySdkBuilder openTelemetrySdkBuilder = OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingConfig.Exporter exporter = config.getProvider().getExporter(); + if (exporter != null) { + SpanExporter spanExporter; + String exporterName = config.getProvider().getExporter().getName(); + switch (exporterName) { + case "otlp": + spanExporter = createOtlpExporter(exporter); + break; + case "jaeger": + spanExporter = createJaegerExporter(exporter); + break; + case "zipkin": + ZipkinSpanExporterBuilder builder = ZipkinSpanExporter.builder().setEndpoint(exporter.getEndpoint()); + if (exporter.getTimeout() != null && !exporter.getTimeout().isZero()) { + builder.setReadTimeout(exporter.getTimeout()); + } + if (exporter.getCompression() != null && !Objects.equals(exporter.getCompression(), "")) { + builder.setCompression(exporter.getCompression()); + } + spanExporter = builder.build(); + break; + default: + throw new Exception("opentelemetry export not set, use default"); + } + + Resource resource = Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, functionName))); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(resource) + .build(); + openTelemetrySdkBuilder.setTracerProvider(sdkTracerProvider); + } + + openTelemetrySdkBuilder.buildAndRegisterGlobal(); + + getter = new TextMapGetter<>() { + @Override + public Iterable keys(@NotNull Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, @NotNull String key) { + return carrier.get(key); + } + }; + + tags = config.getTags(); + if (!Objects.equals(pod, "")) { + tags.put("instance", pod); + } + if (!Objects.equals(namespace, "")) { + tags.put("namespace", pod); + } + + baggage = config.getBaggage(); + } + + private static SpanExporter createOtlpExporter(TracingConfig.Exporter exporter) { + String protocol = exporter.getProtocol(); + if (protocol != null && Objects.equals(protocol, Protocol_HTTP)) { + OtlpHttpSpanExporterBuilder builder = OtlpHttpSpanExporter.builder().setEndpoint(exporter.getEndpoint()); + if (exporter.getTimeout() != null && !exporter.getTimeout().isZero()) { + builder.setTimeout(exporter.getTimeout()); + } + if (exporter.getCompression() != null && !Objects.equals(exporter.getCompression(), "")) { + builder.setCompression(exporter.getCompression()); + } + if (exporter.getHeaders() != null) { + for (String key : exporter.getHeaders().keySet()) { + builder.addHeader(key, exporter.getHeaders().get(key)); + } + } + return JaegerThriftSpanExporter.builder().setEndpoint(exporter.getEndpoint()).build(); + } else { + OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder().setEndpoint(exporter.getEndpoint()); + if (exporter.getTimeout() != null && !exporter.getTimeout().isZero()) { + builder.setTimeout(exporter.getTimeout()); + } + if (exporter.getCompression() != null && !Objects.equals(exporter.getCompression(), "")) { + builder.setCompression(exporter.getCompression()); + } + if (exporter.getHeaders() != null) { + for (String key : exporter.getHeaders().keySet()) { + builder.addHeader(key, exporter.getHeaders().get(key)); + } + } + return builder.build(); + } + } + + private static SpanExporter createJaegerExporter(TracingConfig.Exporter exporter) { + String protocol = exporter.getProtocol(); + if (protocol != null && Objects.equals(protocol, Protocol_HTTP)) { + return JaegerThriftSpanExporter.builder().setEndpoint(exporter.getEndpoint()).build(); + } else { + JaegerGrpcSpanExporterBuilder builder = JaegerGrpcSpanExporter.builder().setEndpoint(exporter.getEndpoint()); + if (exporter.getTimeout() != null && !exporter.getTimeout().isZero()) { + builder.setTimeout(exporter.getTimeout()); + } + if (exporter.getCompression() != null && !Objects.equals(exporter.getCompression(), "")) { + builder.setCompression(exporter.getCompression()); + } + + return builder.build(); + } + } + + @Override + public void executeWithTracing(HttpRequest httpRequest, Callback callback) throws Exception { + Map carrier = new HashMap<>(); + for (String key : httpRequest.getHeaders().keySet()) { + carrier.put(key, httpRequest.getHeaders().get(key).get(0)); + } + + executeWithTracing(carrier, callback); + } + + @Override + public void executeWithTracing(CloudEvent event, Callback callback) throws Exception { + Map carrier = new HashMap<>(); + for (String key : event.getExtensionNames()) { + Object obj = event.getExtension(key); + carrier.put(key, obj == null ? "" : obj.toString()); + } + + executeWithTracing(carrier, callback); + } + + @Override + public void executeWithTracing(TopicEvent event, Callback callback) throws Exception { + executeWithTracing(new HashMap<>(), callback); + } + + @Override + public void executeWithTracing(BindingEvent event, Callback callback) throws Exception { + executeWithTracing(new HashMap<>(), callback); + } + + @Override + public void executeWithTracing(Callback callback) throws Exception { + executeWithTracing("function", SpanKind.INTERNAL, null, callback); + } + + @Override + public void executeWithTracing(Plugin plugin, Callback callback) throws Exception { + Map tags = new HashMap<>(); + tags.put("kind", "Plugin"); + tags.put("name", plugin.name()); + tags.put("version", plugin.version()); + if (plugin.tagsAddToTracing() != null) { + tags.putAll(plugin.tagsAddToTracing()); + } + + executeWithTracing(plugin.name(), SpanKind.INTERNAL, tags, callback); + } + + @Override + public void executeWithTracing(UserContext ctx, Callback callback) throws Exception { + SpanKind kind = SpanKind.SERVER; + if (ctx.getHttpRequest() == null) { + Map inputs = ctx.getInputs(); + if (inputs != null && !inputs.isEmpty()) { + kind = SpanKind.CONSUMER; + } else { + kind = SpanKind.PRODUCER; + } + } + + Map tags = new HashMap<>(); + tags.put("function", ctx.getFunctionClass().getName()); + + executeWithTracing(ctx.getFunctionClass().getSimpleName(), kind, tags, callback); + } + + public void executeWithTracing(Map carrier, Callback callback) throws Exception { + TextMapPropagator propagator = GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + Context parentContext = propagator.extract(Context.root(), carrier, getter); + Tracer tracer = GlobalOpenTelemetry.getTracer(OTEL_LIBRARY_NAME, OTEL_LIBRARY_VERSION); + Span span = tracer.spanBuilder(functionName) + .setParent(parentContext) + .setSpanKind(SpanKind.SERVER) + .startSpan(); + + setGlobalAttribute(span); + try (Scope ignored = span.makeCurrent()) { + endSpan(span, callback.execute()); + } + } + + private void executeWithTracing(String name, SpanKind kind, Map tags, Callback callback) throws + Exception { + Tracer tracer = GlobalOpenTelemetry.getTracer(OTEL_LIBRARY_NAME, OTEL_LIBRARY_VERSION); + Span span = tracer.spanBuilder(name) + .setSpanKind(kind) + .startSpan(); + span.setAttribute(SemanticAttributes.FAAS_INVOKED_NAME, this.functionName); + span.setAttribute(SemanticAttributes.FAAS_INVOKED_PROVIDER, "OpenFunction"); + + if (tags != null) { + for (String key : tags.keySet()) { + span.setAttribute(key, tags.get(key)); + } + } + + setGlobalAttribute(span); + + try (Scope ignored = span.makeCurrent()) { + endSpan(span, callback.execute()); + } + } + + private void endSpan(Span span, Error error) { + if (error != null) { + span.setStatus(StatusCode.ERROR, error.getMessage()); + } + + span.end(); + } + + private void setGlobalAttribute(Span span) { + span.setAttribute(SemanticAttributes.FAAS_INVOKED_NAME, this.functionName); + span.setAttribute(SemanticAttributes.FAAS_INVOKED_PROVIDER, "OpenFunction"); + + if (tags != null) { + for (String key : tags.keySet()) { + span.setAttribute(AttributeKey.stringKey(key), tags.get(key)); + } + } + } +} diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java new file mode 100644 index 00000000..c349fcc0 --- /dev/null +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java @@ -0,0 +1,76 @@ +/* +Copyright 2022 The OpenFunction Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dev.openfunction.invoker.tracing; + +import dev.openfunction.functions.BindingEvent; +import dev.openfunction.functions.HttpRequest; +import dev.openfunction.functions.Plugin; +import dev.openfunction.functions.TopicEvent; +import dev.openfunction.invoker.Callback; +import dev.openfunction.invoker.context.UserContext; +import io.cloudevents.CloudEvent; + +import java.util.Map; + +public class SkywalkingProvider implements TracingProvider { + /** + * ... + */ + private static final int componentIDOpenFunction = 5013; + +// private final Map tags; +// private final Map baggage; + + public SkywalkingProvider() { + + } + + @Override + public void executeWithTracing(HttpRequest httpRequest, Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(CloudEvent event, Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(BindingEvent event, Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(TopicEvent event, Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(Plugin plugin, Callback callback) throws Exception { + + } + + @Override + public void executeWithTracing(UserContext ctx, Callback callback) throws Exception { + + } +} diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java new file mode 100644 index 00000000..9bf54b20 --- /dev/null +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java @@ -0,0 +1,27 @@ +package dev.openfunction.invoker.tracing; + +import dev.openfunction.functions.BindingEvent; +import dev.openfunction.functions.HttpRequest; +import dev.openfunction.functions.Plugin; +import dev.openfunction.functions.TopicEvent; +import dev.openfunction.invoker.Callback; +import dev.openfunction.invoker.context.UserContext; +import io.cloudevents.CloudEvent; + +import java.util.Map; + +public interface TracingProvider { + void executeWithTracing(HttpRequest httpRequest, Callback callback) throws Exception; + + void executeWithTracing(CloudEvent event, Callback callback) throws Exception; + + void executeWithTracing(TopicEvent event, Callback callback) throws Exception; + + void executeWithTracing(BindingEvent event, Callback callback) throws Exception; + + void executeWithTracing(Callback callback)throws Exception; + + void executeWithTracing(Plugin plugin, Callback callback)throws Exception; + + void executeWithTracing(UserContext ctx, Callback callback)throws Exception; +} diff --git a/functions-framework-invoker/src/main/resources/nocalhost/components.yaml b/functions-framework-invoker/src/main/resources/nocalhost/components.yaml index addb2652..70663101 100644 --- a/functions-framework-invoker/src/main/resources/nocalhost/components.yaml +++ b/functions-framework-invoker/src/main/resources/nocalhost/components.yaml @@ -8,7 +8,25 @@ spec: metadata: - name: schedule value: "@every 2s" - +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: kafka-output +spec: + type: bindings.kafka + version: v1 + metadata: + - name: brokers + value: "kafka-server-kafka-brokers:9092" + - name: topics + value: "topic-test" + - name: consumerGroup + value: "topic-test" + - name: publishTopic + value: "topic-test" + - name: authRequired + value: "false" --- apiVersion: dapr.io/v1alpha1 kind: Component @@ -24,3 +42,7 @@ spec: value: "group1" - name: authRequired value: "false" + - name: allowedTopics + value: "topic-test,topic-otel-1" + - name: consumerID + value: "topic-test" diff --git a/functions-framework-invoker/src/main/resources/nocalhost/config.yaml b/functions-framework-invoker/src/main/resources/nocalhost/config.yaml index 9db106ba..1944670a 100644 --- a/functions-framework-invoker/src/main/resources/nocalhost/config.yaml +++ b/functions-framework-invoker/src/main/resources/nocalhost/config.yaml @@ -45,8 +45,9 @@ containers: - name: FUNCTION_TARGET value: dev.openfunction.samples.AsyncFunctionImpl - name: FUNC_CONTEXT - value: "{\"name\":\"sample-binding\",\"version\":\"v2.0.0\",\"inputs\":{\"cron\":{\"uri\":\"cron\",\"componentName\":\"cron-input\",\"componentType\":\"bindings.cron\"}},\"outputs\":{\"sample\":{\"uri\":\"sample-topic\",\"componentName\":\"msg\",\"componentType\":\"bindings.kafka\",}},\"runtime\":\"Async\",\"port\":\"8080\",\"prePlugins\":[\"dev.openfunction.samples.plugins.ExamplePlugin\"],\"postPlugins\":[\"dev.openfunction.samples.plugins.ExamplePlugin\"]}" #value: "{\"name\":\"sample-pubsub\",\"version\":\"v2.0.0\",\"inputs\":{\"sub\":{\"uri\":\"sample-topic\",\"componentName\":\"msg\",\"componentType\":\"pubsub.kafka\"}},\"outputs\":{},\"runtime\":\"Async\",\"port\":\"8080\",\"prePlugins\":[\"dev.openfunction.samples.plugin.ExamplePlugin\"],\"postPlugins\":[\"dev.openfunction.samples.plugin.ExamplePlugin\"]}" + # function context for binding with tracing + #value:"{\"name\":\"sample-binding\",\"version\":\"v2.0.0\",\"inputs\":{\"cron\":{\"componentName\":\"cron-input\",\"componentType\":\"bindings.cron\"}},\"outputs\":{\t\t\t\t\"kafka\":{\t\t\t\t\"uri\":\"topic-test\",\t\t\t\t\"componentName\":\"kafka-output\",\"componentType\":\"bindings.kafka\",\"operation\":\"create\"\t\t\t\t}},\"runtime\":\"Async\",\"port\":\"8080\",\"prePlugins\":[\"dev.openfunction.samples.plugins.ExamplePlugin\"],\"postPlugins\":[\"dev.openfunction.samples.plugins.ExamplePlugin\"],\"pluginsTracing\":{\"enabled\":true,\"provider\":{\"name\":\"opentelemetry\"},\"tags\":{\"func\":\"sample-binding\",\"layer\":\"faas\"},\"baggage\":{\"key\":\"opentelemetry\",\"value\":\"v1.23.0\"}}}" portForward: [] patches: - patch: '{"spec":{"template":{"metadata":{"annotations":{"dapr.io/app-id":"cron-input-kafka-output-default", "dapr.io/app-port": "8080", "dapr.io/app-protocol":"grpc","dapr.io/enabled":"true","dapr.io/log-as-json": "true","dapr.io/log-level":"debug"}}}}}' diff --git a/functions-framework-invoker/src/main/resources/nocalhost/debug.sh b/functions-framework-invoker/src/main/resources/nocalhost/debug.sh index 7252f9ee..1137002e 100755 --- a/functions-framework-invoker/src/main/resources/nocalhost/debug.sh +++ b/functions-framework-invoker/src/main/resources/nocalhost/debug.sh @@ -1,4 +1,4 @@ #!/bin/bash mvn clean compile dependency:copy-dependencies -mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath target/classes/:target/dependency/* -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005 dev.openfunction.invoker.Runner" +mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005 dev.openfunction.invoker.Runner" diff --git a/functions-framework-invoker/src/main/resources/nocalhost/run.sh b/functions-framework-invoker/src/main/resources/nocalhost/run.sh index 318ffc44..04590525 100755 --- a/functions-framework-invoker/src/main/resources/nocalhost/run.sh +++ b/functions-framework-invoker/src/main/resources/nocalhost/run.sh @@ -1,4 +1,4 @@ #!/bin/bash mvn clean compile dependency:copy-dependencies -mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath target/classes/:target/dependency/* dev.openfunction.invoker.Runner" +mvn exec:exec -Dexec.executable="java" -Dexec.args="-classpath samples-1.0-SNAPSHOT.jar:target/classes/:target/dependency/* dev.openfunction.invoker.Runner" diff --git a/samples/pom.xml b/samples/pom.xml index 98ff9f86..abab19d6 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -31,7 +31,7 @@ dev.openfunction.functions functions-framework-api - 1.0.0-SNAPSHOT + 1.1.0-SNAPSHOT diff --git a/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java b/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java index f11f8015..a978fdef 100644 --- a/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java +++ b/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Map; public class ExamplePlugin implements Plugin { private int seq = 0; @@ -74,4 +75,14 @@ private void execHook(Context ctx, String type) { public Object getField(String fieldName) { return null; } + + @Override + public Boolean needToTracing() { + return true; + } + + @Override + public Map tagsAddToTracing() { + return null; + } }